Repository URL to install this package:
|
Version:
2.8.0 ▾
|
-- compute available slots under worker pool concurrency limits
WITH pool_slots AS (
SELECT
wp.id,
GREATEST (0, wp.concurrency_limit - COUNT(fr.id)) AS available_slots
FROM
work_pool wp
JOIN work_queue wq ON wq.work_pool_id = wp.id
LEFT JOIN flow_run fr ON wq.id = fr.work_queue_id
AND fr.state_type IN ('RUNNING', 'PENDING')
WHERE
wp.is_paused IS FALSE
AND wq.is_paused IS FALSE
AND wp.concurrency_limit IS NOT NULL
GROUP BY
wp.id
),
-- compute avaialble slots under worker pool queue concurrency limits
queue_slots AS (
SELECT
wq.id,
GREATEST (0, wq.concurrency_limit - COUNT(fr.id)) AS available_slots
FROM
work_queue wq
LEFT JOIN flow_run fr ON wq.id = fr.work_queue_id
AND fr.state_type IN ('RUNNING', 'PENDING')
WHERE
wq.is_paused IS FALSE
AND wq.concurrency_limit IS NOT NULL
GROUP BY
wq.id
)
-- get all flow runs that match criteria
SELECT
wp.id AS run_work_pool_id,
fr_outer.work_queue_id AS run_work_queue_id,
fr_outer.*
FROM
work_pool wp
LEFT JOIN pool_slots ON wp.id = pool_slots.id
-- get worker pool queues for each worker pool
-- this is a lateral join so we can efficiently apply an independent limit
-- to each pool based on available concurrency
CROSS JOIN LATERAL (
SELECT
fr_inner.*,
wq.priority as work_queue_priority
FROM
work_queue wq
LEFT JOIN queue_slots ON wq.id = queue_slots.id
-- get scheduled flow runs for each worker pool queue
-- this is a lateral join so we can efficiently apply an independent limit
-- to each queue based on available concurrency
CROSS JOIN LATERAL (
SELECT
fr.*
FROM
flow_run fr
WHERE
fr.work_queue_id = wq.id
AND fr.state_type = 'SCHEDULED'
{% if scheduled_after %}
AND fr.next_scheduled_start_time >= :scheduled_after
{% endif %}
{% if scheduled_before %}
AND fr.next_scheduled_start_time <= :scheduled_before
{% endif %}
ORDER BY
fr.next_scheduled_start_time ASC
LIMIT LEAST (:queue_limit, queue_slots.available_slots)
-- lock runs
FOR UPDATE SKIP LOCKED
) fr_inner
WHERE
wq.work_pool_id = wp.id
AND wq.is_paused IS FALSE
{% if work_queue_ids %}
-- optionally filter for specific worker pool queue IDs
AND wq.id IN :work_queue_ids
{% endif %}
ORDER BY
{% if respect_queue_priorities %}
-- optionally order by worker pool queue priorities
wq.priority ASC,
{% endif %}
fr_inner.next_scheduled_start_time ASC
LIMIT LEAST (:worker_limit, pool_slots.available_slots)) fr_outer
WHERE
wp.is_paused IS FALSE
{% if work_pool_ids %}
-- optionally filter for specific worker pool IDs
AND wp.id IN :work_pool_ids
{% endif %}
ORDER BY
{% if respect_queue_priorities %}
work_queue_priority ASC,
{% endif %}
next_scheduled_start_time ASC
LIMIT :limit