Repository URL to install this package:
|
Version:
2.8.0 ▾
|
-- compute available slots under worker pool concurrency limits
WITH worker_slots AS (
SELECT
wp.id,
MAX(0, wp.concurrency_limit - count(fr.id)) AS available_slots
FROM
work_pool wp
JOIN work_queue wq ON wq.work_pool_id = wp.id
LEFT JOIN flow_run fr ON wq.id = fr.work_queue_id
AND fr.state_type in('RUNNING', 'PENDING')
WHERE
wp.is_paused IS FALSE
AND wp.concurrency_limit IS NOT NULL
GROUP BY
wp.id
),
-- compute avaialble slots under worker pool queue concurrency limits
queue_slots AS (
SELECT
wq.id,
MAX(0, wq.concurrency_limit - count(fr.id)) AS available_slots
FROM
work_queue wq
LEFT JOIN flow_run fr ON wq.id = fr.work_queue_id
AND fr.state_type in('RUNNING', 'PENDING')
WHERE
wq.is_paused IS FALSE
AND wq.concurrency_limit IS NOT NULL
GROUP BY
wq.id
),
-- CTE that loads flow runs and applies worker pool queue limits
scheduled_flow_runs AS (
SELECT
wp.id AS run_work_pool_id,
wq.id AS run_work_queue_id,
fr.*,
worker_slots.available_slots as available_worker_slots,
ROW_NUMBER() OVER (PARTITION BY wp.id ORDER BY {% if respect_queue_priorities %}wq.priority ASC, {% endif %}fr.next_scheduled_start_time) AS work_pool_rank
FROM
work_pool wp
JOIN work_queue wq ON wq.work_pool_id = wp.id
LEFT JOIN worker_slots ON wp.id = worker_slots.id
LEFT JOIN queue_slots ON wq.id = queue_slots.id
JOIN (
SELECT
fr.*,
ROW_NUMBER() OVER (PARTITION BY work_queue_id ORDER BY next_scheduled_start_time) AS work_queue_rank
FROM flow_run fr
WHERE fr.state_type = 'SCHEDULED'
{% if scheduled_after %}
AND fr.next_scheduled_start_time >= :scheduled_after
{% endif %}
{% if scheduled_before %}
AND fr.next_scheduled_start_time <= :scheduled_before
{% endif %}
) fr
ON
fr.work_queue_id = wq.id
AND work_queue_rank <= MIN(COALESCE(queue_slots.available_slots, :queue_limit), :queue_limit)
WHERE
wp.is_paused IS FALSE
AND wq.is_paused IS FALSE
{% if work_pool_ids %}
-- optionally filter for specific worker pool IDs
AND wp.id IN :work_pool_ids
{% endif %}
{% if work_queue_ids %}
-- optionally filter for specific worker pool queue IDs
AND wq.id IN :work_queue_ids
{% endif %}
)
SELECT
*
FROM scheduled_flow_runs
WHERE
work_pool_rank <= MIN(COALESCE(available_worker_slots, :worker_limit), :worker_limit)
ORDER BY
{% if respect_queue_priorities %}
work_pool_rank ASC,
{% endif %}
next_scheduled_start_time ASC
LIMIT :limit