Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""Zookeeper transport.

:copyright: (c) 2010 - 2013 by Mahendra M.
:license: BSD, see LICENSE for more details.

**Synopsis**

Connects to a zookeeper node as <server>:<port>/<vhost>
The <vhost> becomes the base for all the other znodes.  So we can use
it like a vhost.

This uses the built-in kazoo recipe for queues

**References**

- https://zookeeper.apache.org/doc/trunk/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html

**Limitations**
This queue does not offer reliable consumption.  An entry is removed from
the queue prior to being processed.  So if an error occurs, the consumer
has to re-queue the item or it will be lost.
"""
from __future__ import absolute_import, unicode_literals

import os
import socket

from kombu.five import Empty
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads, dumps

from . import virtual

try:
    import kazoo
    from kazoo.client import KazooClient
    from kazoo.recipe.queue import Queue

    KZ_CONNECTION_ERRORS = (
        kazoo.exceptions.SystemErrorException,
        kazoo.exceptions.ConnectionLossException,
        kazoo.exceptions.MarshallingErrorException,
        kazoo.exceptions.UnimplementedException,
        kazoo.exceptions.OperationTimeoutException,
        kazoo.exceptions.NoAuthException,
        kazoo.exceptions.InvalidACLException,
        kazoo.exceptions.AuthFailedException,
        kazoo.exceptions.SessionExpiredException,
    )

    KZ_CHANNEL_ERRORS = (
        kazoo.exceptions.RuntimeInconsistencyException,
        kazoo.exceptions.DataInconsistencyException,
        kazoo.exceptions.BadArgumentsException,
        kazoo.exceptions.MarshallingErrorException,
        kazoo.exceptions.UnimplementedException,
        kazoo.exceptions.OperationTimeoutException,
        kazoo.exceptions.ApiErrorException,
        kazoo.exceptions.NoNodeException,
        kazoo.exceptions.NoAuthException,
        kazoo.exceptions.NodeExistsException,
        kazoo.exceptions.NoChildrenForEphemeralsException,
        kazoo.exceptions.NotEmptyException,
        kazoo.exceptions.SessionExpiredException,
        kazoo.exceptions.InvalidCallbackException,
        socket.error,
    )
except ImportError:
    kazoo = None                                    # noqa
    KZ_CONNECTION_ERRORS = KZ_CHANNEL_ERRORS = ()   # noqa

DEFAULT_PORT = 2181

__author__ = 'Mahendra M <mahendra.m@gmail.com>'


class Channel(virtual.Channel):
    """Zookeeper Channel."""

    _client = None
    _queues = {}

    def _get_path(self, queue_name):
        return os.path.join(self.vhost, queue_name)

    def _get_queue(self, queue_name):
        queue = self._queues.get(queue_name, None)

        if queue is None:
            queue = Queue(self.client, self._get_path(queue_name))
            self._queues[queue_name] = queue

            # Ensure that the queue is created
            len(queue)

        return queue

    def _put(self, queue, message, **kwargs):
        return self._get_queue(queue).put(
            dumps(message),
            priority=self._get_message_priority(message, reverse=True),
        )

    def _get(self, queue):
        queue = self._get_queue(queue)
        msg = queue.get()

        if msg is None:
            raise Empty()

        return loads(bytes_to_str(msg))

    def _purge(self, queue):
        count = 0
        queue = self._get_queue(queue)

        while True:
            msg = queue.get()
            if msg is None:
                break
            count += 1

        return count

    def _delete(self, queue, *args, **kwargs):
        if self._has_queue(queue):
            self._purge(queue)
            self.client.delete(self._get_path(queue))

    def _size(self, queue):
        queue = self._get_queue(queue)
        return len(queue)

    def _new_queue(self, queue, **kwargs):
        if not self._has_queue(queue):
            queue = self._get_queue(queue)

    def _has_queue(self, queue):
        return self.client.exists(self._get_path(queue)) is not None

    def _open(self):
        conninfo = self.connection.client
        self.vhost = os.path.join('/', conninfo.virtual_host[0:-1])
        hosts = []
        if conninfo.alt:
            for host_port in conninfo.alt:
                if host_port.startswith('zookeeper://'):
                    host_port = host_port[len('zookeeper://'):]
                if not host_port:
                    continue
                try:
                    host, port = host_port.split(':', 1)
                    host_port = (host, int(port))
                except ValueError:
                    if host_port == conninfo.hostname:
                        host_port = (host_port, conninfo.port or DEFAULT_PORT)
                    else:
                        host_port = (host_port, DEFAULT_PORT)
                hosts.append(host_port)
        host_port = (conninfo.hostname, conninfo.port or DEFAULT_PORT)
        if host_port not in hosts:
            hosts.insert(0, host_port)
        conn_str = ','.join(['%s:%s' % (h, p) for h, p in hosts])
        conn = KazooClient(conn_str)
        conn.start()
        return conn

    @property
    def client(self):
        if self._client is None:
            self._client = self._open()
        return self._client


class Transport(virtual.Transport):
    """Zookeeper Transport."""

    Channel = Channel
    polling_interval = 1
    default_port = DEFAULT_PORT
    connection_errors = (
        virtual.Transport.connection_errors + KZ_CONNECTION_ERRORS
    )
    channel_errors = (
        virtual.Transport.channel_errors + KZ_CHANNEL_ERRORS
    )
    driver_type = 'zookeeper'
    driver_name = 'kazoo'

    def __init__(self, *args, **kwargs):
        if kazoo is None:
            raise ImportError('The kazoo library is not installed')

        super(Transport, self).__init__(*args, **kwargs)

    def driver_version(self):
        return kazoo.__version__