Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ worker / autoscale.py

# -*- coding: utf-8 -*-
"""
    celery.worker.autoscale
    ~~~~~~~~~~~~~~~~~~~~~~~

    This module implements the internal thread responsible
    for growing and shrinking the pool according to the
    current autoscale settings.

    The autoscale thread is only enabled if :option:`--autoscale`
    has been enabled on the command-line.

"""
from __future__ import absolute_import

import os
import threading

from time import sleep

from kombu.async.semaphore import DummyLock

from celery import bootsteps
from celery.five import monotonic
from celery.utils.log import get_logger
from celery.utils.threads import bgThread

from . import state
from .components import Pool

__all__ = ['Autoscaler', 'WorkerComponent']

logger = get_logger(__name__)
debug, info, error = logger.debug, logger.info, logger.error

AUTOSCALE_KEEPALIVE = float(os.environ.get('AUTOSCALE_KEEPALIVE', 30))


class WorkerComponent(bootsteps.StartStopStep):
    label = 'Autoscaler'
    conditional = True
    requires = (Pool, )

    def __init__(self, w, **kwargs):
        self.enabled = w.autoscale
        w.autoscaler = None

    def create(self, w):
        scaler = w.autoscaler = self.instantiate(
            w.autoscaler_cls,
            w.pool, w.max_concurrency, w.min_concurrency,
            worker=w, mutex=DummyLock() if w.use_eventloop else None,
        )
        return scaler if not w.use_eventloop else None

    def register_with_event_loop(self, w, hub):
        w.consumer.on_task_message.add(w.autoscaler.maybe_scale)
        hub.call_repeatedly(
            w.autoscaler.keepalive, w.autoscaler.maybe_scale,
        )


class Autoscaler(bgThread):

    def __init__(self, pool, max_concurrency,
                 min_concurrency=0, worker=None,
                 keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
        super(Autoscaler, self).__init__()
        self.pool = pool
        self.mutex = mutex or threading.Lock()
        self.max_concurrency = max_concurrency
        self.min_concurrency = min_concurrency
        self.keepalive = keepalive
        self._last_action = None
        self.worker = worker

        assert self.keepalive, 'cannot scale down too fast.'

    def body(self):
        with self.mutex:
            self.maybe_scale()
        sleep(1.0)

    def _maybe_scale(self, req=None):
        procs = self.processes
        cur = min(self.qty, self.max_concurrency)
        if cur > procs:
            self.scale_up(cur - procs)
            return True
        elif cur < procs:
            self.scale_down((procs - cur) - self.min_concurrency)
            return True

    def maybe_scale(self, req=None):
        if self._maybe_scale(req):
            self.pool.maintain_pool()

    def update(self, max=None, min=None):
        with self.mutex:
            if max is not None:
                if max < self.max_concurrency:
                    self._shrink(self.processes - max)
                self.max_concurrency = max
            if min is not None:
                if min > self.min_concurrency:
                    self._grow(min - self.min_concurrency)
                self.min_concurrency = min
            return self.max_concurrency, self.min_concurrency

    def force_scale_up(self, n):
        with self.mutex:
            new = self.processes + n
            if new > self.max_concurrency:
                self.max_concurrency = new
            self.min_concurrency += 1
            self._grow(n)

    def force_scale_down(self, n):
        with self.mutex:
            new = self.processes - n
            if new < self.min_concurrency:
                self.min_concurrency = max(new, 0)
            self._shrink(min(n, self.processes))

    def scale_up(self, n):
        self._last_action = monotonic()
        return self._grow(n)

    def scale_down(self, n):
        if n and self._last_action and (
                monotonic() - self._last_action > self.keepalive):
            self._last_action = monotonic()
            return self._shrink(n)

    def _grow(self, n):
        info('Scaling up %s processes.', n)
        self.pool.grow(n)
        self.worker.consumer._update_prefetch_count(n)

    def _shrink(self, n):
        info('Scaling down %s processes.', n)
        try:
            self.pool.shrink(n)
        except ValueError:
            debug("Autoscaler won't scale down: all processes busy.")
        except Exception as exc:
            error('Autoscaler: scale_down: %r', exc, exc_info=True)
        self.worker.consumer._update_prefetch_count(-n)

    def info(self):
        return {'max': self.max_concurrency,
                'min': self.min_concurrency,
                'current': self.processes,
                'qty': self.qty}

    @property
    def qty(self):
        return len(state.reserved_requests)

    @property
    def processes(self):
        return self.pool.num_processes