Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
fury_pyre / pyre / pyre_peer.py
Size: Mime:
import time
import zmq
import logging

logger = logging.getLogger(__name__)


class PyrePeer(object):

    PEER_EXPIRED = 30              # expire after 10s
    PEER_EVASIVE = 10              # mark evasive after 5s

    def __init__(self, ctx, identity):
        # TODO: what to do with container?
        self._ctx = ctx          # ZMQ context
        self.mailbox = None      # Socket through to peer
        self.identity = identity # Identity UUID
        self.endpoint = None     # Endpoint connected to
        self.name = "notset"     # Peer's public name
        self.origin = "unknown"  # Origin node's public name
        self.evasive_at = 0      # Peer is being evasive
        self.expired_at = 0      # Peer has expired by now
        self.connected = False   # Peer will send messages
        self.ready = False       # Peer has said Hello to us
        self.status = 0          # Our status counter
        self.sent_sequence = 0   # Outgoing message sequence
        self.want_sequence = 0   # Incoming message sequence
        self.headers = {}        # Peer headers

    def __del__(self):
        self.disconnect()

    # Connect peer mailbox
    def connect(self, reply_to, endpoint):
        if self.connected:
            return

        # Create new outgoing socket (drop any messages in transit)
        self.mailbox = zmq.Socket(self._ctx, zmq.DEALER)
        # Set our caller 'From' identity so that receiving node knows
        # who each message came from.
        # Set our own identity on the socket so that receiving node
        # knows who each message came from. Note that we cannot use
        # the UUID directly as the identity since it may contain a
        # zero byte at the start, which libzmq does not like for
        # historical and arguably bogus reasons that it nonetheless
        # enforces.
        # we set linger to 0 by default (In zyre this is done by czmq's zsys)
        self.mailbox.setsockopt(zmq.LINGER, 0)
        self.mailbox.setsockopt(zmq.IDENTITY, b'\x01' + reply_to.bytes)
        # Set a high-water mark that allows for reasonable activity
        self.mailbox.setsockopt(zmq.SNDHWM, PyrePeer.PEER_EXPIRED * 100)
        # Send messages immediately or return EAGAIN
        self.mailbox.setsockopt(zmq.SNDTIMEO, 0)

        # Connect through to peer node
        logger.debug("Connecting to peer {0} on endpoint {1}".format(self.identity, endpoint))

        self.mailbox.connect(endpoint)
        self.endpoint = endpoint
        self.connected = True
        self.ready = False

    # Disconnect peer mailbox
    # No more messages will be sent to peer until connected again
    def disconnect(self):
        # If connected, destroy socket and drop all pending messages
        if (self.connected):
            logger.debug("{0} Disconnecting peer {1}".format(self.origin, self.name))
            self.mailbox.close()
            self.mailbox = None
            self.endpoint = ""
            self.connected = False
            self.ready = False
    # end disconnect

    # Send message to peer
    def send(self, msg):
        if self.connected:
            self.sent_sequence += 1
            self.sent_sequence = self.sent_sequence % 65535
            msg.set_sequence(self.sent_sequence)

            try:
                msg.send(self.mailbox)
            except zmq.AGAIN as e:
                self.disconnect()
                logger.debug("{0} Error while sending {1} to peer={2} sequence={3}".format(self.origin,
                                                                            msg.get_command(),
                                                                            self.name,
                                                                            msg.get_sequence()))
                return -1

            logger.debug("{0} send {1} to peer={2} sequence={3}".format(self.origin,
                msg.get_command(),
                self.name,
                msg.get_sequence()))

        else:
            logger.debug("Peer {0} is not connected".format(self.identity))
    # end send

    # Return peer connected status
    def is_connected(self):
        return self.connected
    # end is_connected

    # Return peer identity string
    def get_identity(self):
        return self.identity
    # end get_identity

    # Return peer connection endpoint
    def get_endpoint(self):
        if self.connected:
            return self.endpoint
        else:
            return ""
    # end get_endpoint

    # Register activity at peer
    def refresh(self):
        self.evasive_at = time.time() + self.PEER_EVASIVE
        self.expired_at = time.time() + self.PEER_EXPIRED
    # end refresh

    # Return future evasive time
    def evasive_at(self):
        return self.evasive_at
    # end evasiv_at

    # Return future expired time
    def expired_at(self):
        return self.expired_at
    # end expired_at

    # Return peer name
    def get_name(self):
        return self.name

    # Set peer name
    def set_name(self, name):
        self.name = name

    # Set current node name, for logging
    def set_origin(self, origin):
        self.origin = origin

    # Return peer status
    def get_status(self):
        return self.status
    # end get_status

    # Set peer status
    def set_status(self, status):
        self.status = status
    # end set_status

    # Return peer ready state
    def get_ready(self):
        return self.ready
    # end get_ready

    # Set peer ready state
    def set_ready(self, ready):
        self.ready = ready
    # end set_ready

    # Get peer header value
    def get_header(self, key):
        return self.headers.get(key, "")
    # end get_header

    # Get peer headers
    def get_headers(self):
        return self.headers

    # Set peer headers
    def set_headers(self, headers):
        self.headers = headers
    # end set_headers

    # Check if messages were lost from peer, returns true if they were
    def messages_lost(self, msg):
        # The sequence number set by the peer, and our own calculated
        # sequence number should be the same.
        logger.debug("(%s) recv %s from peer=%s sequence=%d"%(
            self.origin,
            msg.get_command(),
            self.name,
            msg.get_sequence()) )
        if msg.get_command == "HELLO":
            self.want_sequence = 1
        else:
            self.want_sequence += 1
            self.want_sequence = self.want_sequence % 65535

        if self.want_sequence != msg.get_sequence():
            logger.debug("(%s) seq error from peer=%s expect=%d, got=%d",
                self.origin,
                self.name,
                self.want_sequence,
                msg.get_sequence())
            return True;
        return False
    # end check_message