Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
fury_pyre / pyre / zactor.py
Size: Mime:
# =========================================================================
# zactor - simple actor framework
# 
# Copyright (c) the Contributors as noted in the AUTHORS file.
# This file is part of CZMQ, the high-level C binding for 0MQ:
# http://czmq.zeromq.org.
# 
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =========================================================================
# 
# 
# The zactor class provides a simple actor framework. It replaces the
# zthread class, which had a complex API that did not fit the CLASS
# standard. A CZMQ actor is implemented as a thread plus a PAIR-PAIR
# pipe. The constructor and destructor are always synchronized, so the
# caller can be sure all resources are created, and destroyed, when these
# calls complete. (This solves a major problem with zthread, that a caller
# could not be sure when a child thread had finished.)
# 
# A zactor_t instance acts like a zsock_t and you can pass it to any CZMQ
# method that would take a zsock_t argument, including methods in zframe,
# zmsg, zstr, zpoller, and zloop.

import zmq
import threading
import logging
from . import zsocket
from . import zhelper

logger = logging.getLogger(__name__)

class ZActor(object):
    
    ZACTOR_TAG = 0x0005cafe

    def __init__(self, ctx, actor, *args, **kwargs):
        self.tag = self.ZACTOR_TAG
        self.ctx = ctx
        # Create front-to-back pipe pair
        self.pipe, self.shim_pipe = zhelper.zcreate_pipe(ctx)
        self.shim_handler = actor
        self.shim_args = (self.ctx, self.shim_pipe)+args
        self.shim_kwargs = kwargs
        self.thread = threading.Thread(target=self.run)
        # we manage threads exiting ourselves!
        self.thread.daemon = False
        self.thread.start()

        # Mandatory handshake for new actor so that constructor returns only
        # when actor has also initialized. This eliminates timing issues at
        # application start up.
        self.pipe.wait()

    def run(self):
        self.shim_handler(*self.shim_args, **self.shim_kwargs)
        self.shim_pipe.set(zmq.SNDTIMEO, 0)
        self.shim_pipe.signal()
        self.shim_pipe.close()

    def destroy(self):
        # Signal the actor to end and wait for the thread exit code
        # If the pipe isn't connected any longer, assume child thread
        # has already quit due to other reasons and don't collect the
        # exit signal.
        if self.tag == 0xDeadBeef:
            logger.warning("Zactor: already destroyed")
            return
        self.pipe.set(zmq.SNDTIMEO, 0)
        self.pipe.send_unicode("$TERM")
        # misschien self.pipe.wait()????
        self.pipe.wait()
        self.pipe.close()
        self.tag = 0xDeadBeef;

    def send(self, *args, **kwargs):
        return self.pipe.send(*args, **kwargs)

    def send_unicode(self, *args, **kwargs):
        return self.pipe.send_unicode(*args, **kwargs)

    def send_multipart(self, *args, **kwargs):
        return self.pipe.send_multipart(*args, **kwargs)

    def send_pyobj(self, *args, **kwargs):
        return self.pipe.send_pyobj(*args, **kwargs)

    def recv(self, *args, **kwargs):
        return self.pipe.recv(*args, **kwargs)

    def recv_unicode(self, *args, **kwargs):
        return self.pipe.recv_unicode(*args, **kwargs)

    def recv_multipart(self, *args, **kwargs):
        return self.pipe.recv_multipart(*args, **kwargs)

    def recv_pyobj(self, *args, **kwargs):
        return self.pipe.recv_pyobj(*args, **kwargs)

    # --------------------------------------------------------------------------
    # Probe the supplied object, and report if it looks like a zactor_t.
    def is_zactor(self):
        return isinstance(ZActor, self)

    # --------------------------------------------------------------------------
    # Probe the supplied reference. If it looks like a zactor_t instance,
    # return the underlying libzmq actor handle; else if it looks like
    # a libzmq actor handle, return the supplied value.
    # In Python we just return the pipe socket
    def resolve(self):
        return self.pipe

def echo_actor(ctx, pipe, *args):
    # Do some initialization
    pipe.signal()
    terminated = False;
    while not terminated:
        msg = pipe.recv_multipart();
        command = msg.pop(0)
        if command == b"$TERM":
            terminated = True
        elif command == b"ECHO":
            pipe.send(msg.pop(0))
        else:
            print("E: invalid message to actor")
    pipe.signal()


def zactor_test(verbose=False):
    print(" * zactor: ")
    actor = ZActor(zmq.Context(), echo_actor, "Hello, World")
    actor.send_unicode("ECHO", zmq.SNDMORE)
    actor.send_unicode("This is a string")
    msg = actor.recv()
    print("RECEIVED: %s" %msg)
    actor.destroy()
    print("OK");

if __name__ == '__main__':
    zactor_test()