Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
fury_pyre / pyre / zre_msg.py
Size: Mime:
""" These are the zre_msg messages
    HELLO - Greet a peer so it can connect back to us
        sequence      number 2  Cyclic sequence number
        endpoint      string
        groups        strings
        status        number 1
        name          string
        headers       dictionary
    WHISPER - Send a message to a peer
        sequence      number 2
        content       frame
    SHOUT - Send a message to a group
        sequence      number 2
        group         string
        content       frame
    JOIN - Join a group
        sequence      number 2
        group         string
        status        number 1
    LEAVE - Leave a group
        sequence      number 2
        group         string
        status        number 1
    PING - Ping a peer that has gone silent
        sequence      number 2
    PING_OK - Reply to a peer's ping
        sequence      number 2
"""

import struct
import uuid
import zmq
import logging

STRING_MAX = 255

logger = logging.getLogger(__name__)


class ZreMsg(object):

    VERSION = 2
    HELLO   = 1
    WHISPER = 2
    SHOUT = 3
    JOIN = 4
    LEAVE = 5
    PING = 6
    PING_OK = 7

    def __init__(self, id=None, *args, **kwargs):
        self.address = ""
        self.id = id
        self.sequence = 0
        self.endpoint = ""
        self.groups = ()
        self.group = None
        self.status = 0
        self.name = ""
        self.headers = {}
        self.content = b""
        self.struct_data = kwargs.get("data", b'')
        self._needle = 0
        self._ceil = len(self.struct_data)

    #def __del__(self):

    def recv(self, input_socket):
        # If we're reading from a ROUTER socket, get address
        frames = input_socket.recv_multipart()
        if input_socket.socket_type == zmq.ROUTER:
            self.address = frames.pop(0)
            # we drop the first byte: TODO ref!
            try:
                self.address = uuid.UUID(bytes=self.address[1:])
            except ValueError:
                logger.debug("Peer identity frame empty or malformed")
                return None

        # Read and parse command in frame
        self.struct_data = frames.pop(0)
        if not self.struct_data:
            return None

        # Get and check protocol signature
        if self._needle != 0:
            logger.debug("Message already decoded for protocol signature")

        self._ceil = len(self.struct_data)

        signature = self._get_number2()
        if signature != (0xAAA0 | 1):
            logger.debug("Invalid signature {0}".format(signature))
            return None

        # Get message id and parse per message type
        self.id = self._get_number1()

        version = self._get_number1()
        if version != 2:
            logger.debug("Invalid version {0}".format(version))
            return None

        if self.id == ZreMsg.HELLO:
            self.unpack_hello()

        elif self.id == ZreMsg.WHISPER:
            self.sequence = self._get_number2()
            if len(frames):
                self.content = frames.pop(0)

        elif self.id == ZreMsg.SHOUT:
            self.sequence = self._get_number2()
            self.group = self._get_string()
            if len(frames):
                self.content = frames.pop(0)

        elif self.id == ZreMsg.JOIN:
            self.sequence = self._get_number2()
            self.group = self._get_string()
            self.status = self._get_number1()

        elif self.id == ZreMsg.LEAVE:
            self.sequence = self._get_number2()
            self.group = self._get_string()
            self.status = self._get_number1()

        elif self.id == ZreMsg.PING:
            self.sequence = self._get_number2()

        elif self.id == ZreMsg.PING_OK:
            self.sequence = self._get_number2()

        else:
            logger.debug("Message type {0} unknown".format(self.id))

    # Send the zre_msg to the output, and destroy it
    def send(self, output_socket):
        # clear data
        self.struct_data = b''
        self._needle = 0

        # add signature
        self._put_number2(0xAAA0 | 1)

        # add id
        self._put_number1(self.id)
        #print(self.struct_data)
        # add version
        self._put_number1(2)

        if self.id == ZreMsg.HELLO:
            self.pack_hello()

        elif self.id == ZreMsg.WHISPER:
            self._put_number2(self.sequence)
            # add content in a new frame

        elif self.id == ZreMsg.SHOUT:
            self._put_number2(self.sequence)
            self._put_string(self.group)
            # add content in a new frame

        elif self.id == ZreMsg.JOIN:
            self._put_number2(self.sequence)
            self._put_string(self.group)
            self._put_number1(self.status)

        elif self.id == ZreMsg.LEAVE:
            self._put_number2(self.sequence)
            self._put_string(self.group)
            self._put_number1(self.status)

        elif self.id == ZreMsg.PING:
            self._put_number2(self.sequence)

        elif self.id == ZreMsg.PING_OK:
            self._put_number2(self.sequence)

        else:
            logger.debug("Message type {0} unknown".format(self.id))

        # If we're sending to a ROUTER, we send the address first
        if output_socket.socket_type == zmq.ROUTER:
            output_socket.send(self.address.bytes, zmq.SNDMORE)
        # Now send the data frame
        if (self.content):
            output_socket.send(self.struct_data, zmq.SNDMORE)
            if isinstance(self.content, list):
                output_socket.send_multipart(self.content)
            else:
                output_socket.send(self.content)
        else:
            output_socket.send(self.struct_data)

    # Send the HELLO to the output in one step
    def send_hello(self, output, sequence, ipaddress, mailbox, groups, status, headers):
        print("E: NOT IMPLEMENTED")
        pass

    # Send the WHISPER to the output in one step
    def send_whisper(self, output, sequence, content):
        print("E: NOT IMPLEMENTED")
        pass

    # Send the SHOUT to the output in one step
    def send_shout(self, output, sequence, group, content):
        print("E: NOT IMPLEMENTED")
        pass

    # Send the JOIN to the output in one step
    def send_join(self, output, sequence, group, status):
        print("E: NOT IMPLEMENTED")
        pass

    # Send the LEAVE to the output in one step
    def send_leave(self, sequence, group, status):
        print("E: NOT IMPLEMENTED")
        pass

    # Send the PING to the output in one step
    def send_ping(self, output, sequence):
        print("E: NOT IMPLEMENTED")
        pass

    #  Send the PING_OK to the output in one step
    def send_ping_ok(self, output, sequence):
        print("E: NOT IMPLEMENTED")
        pass

    # Duplicate the zre_msg message
    def dup(self):
        print("E: NOT IMPLEMENTED")
        pass

    # Print contents of message to stdout
    def dump(self):
        print("E: NOT IMPLEMENTED")
        pass

    # Get/set the message address
    def get_address(self):
        return self.address

    def set_address(self, address):
        self.address = address

    # Get the zre_msg id and printable command
    def get_id(self):
        return self.id

    def set_id(self, id):
        logger.warning("E: set_id NOT IMPLEMENTED")

    def get_command(self):
        if self.id == ZreMsg.HELLO:
            return "HELLO"
        if self.id == ZreMsg.WHISPER:
            return "WHISPER"
        if self.id == ZreMsg.SHOUT:
            return "SHOUT"
        if self.id == ZreMsg.JOIN:
            return "JOIN"
        if self.id == ZreMsg.LEAVE:
            return "LEAVE"
        if self.id == ZreMsg.PING:
            return "PING"
        if self.id == ZreMsg.PING_OK:
            return "PING_OK"

    def get_name(self):
        return self.name

    def set_name(self, name):
        self.name = name

    # Get/set the sequence field
    def get_sequence(self):
        return self.sequence

    def set_sequence(self, sequence):
        self.sequence = sequence

    # Get/set the endpoint field
    def get_endpoint(self):
        return self.endpoint

    def set_endpoint(self, endpoint):
        self.endpoint = endpoint

    # Get/set the ipaddress field
    def get_ipaddress(self):
        return self.ipaddress

    def set_ipaddress(self, ipaddr):
        self.ipaddress = ipaddr

    # Get/set the mailbox field
    def get_mailbox(self):
        return self.mailbox

    def set_mailbox(self, port):
        self.mailbox = port

    # Get/set the groups field
    def get_groups(self):
        return self.groups

    def set_groups(self, groups):
        self.groups = groups

    # Iterate through the groups field, and append a groups value
    # TODO: do we need this in python? l186 zre_msg.h

    #  Get/set the status field
    def get_status(self):
        return self.status

    def set_status(self, status):
        self.status = status

    # Get/set the headers field
    def get_headers(self):
        return self.headers

    def set_headers(self, headers):
        self.headers = headers

    # Get/set a value in the headers dictionary
    # TODO: l208 zre_msg.h

    # Get/set the group field
    def get_group(self):
        return self.group

    def set_group(self, group):
        self.group = group

    def _get_string(self):
        s_len = self._get_number1()
        s = struct.unpack_from(str(s_len) + 's', self.struct_data, offset=self._needle)
        self._needle += struct.calcsize('s' * s_len)
        return s[0].decode('UTF-8')

    def _get_number1(self):
        num = struct.unpack_from('>b', self.struct_data, offset=self._needle)
        self._needle += struct.calcsize('>b')
        return num[0]

    def _get_number2(self):
        num = struct.unpack_from('>H', self.struct_data, offset=self._needle)
        self._needle += struct.calcsize('>H')
        return num[0]

    def _get_number4(self):
        num = struct.unpack_from('>I', self.struct_data, offset=self._needle)
        self._needle += struct.calcsize('>I')
        return num[0]

    def _get_number8(self):
        num = struct.unpack_from('>Q', self.struct_data, offset=self._needle)
        self._needle += struct.calcsize('>Q')
        return num[0]

    def _get_long_string(self):
        s_len = self._get_number4()
        s = struct.unpack_from(str(s_len) + 's', self.struct_data, offset=self._needle)
        self._needle += struct.calcsize('s' * s_len)
        return s[0].decode('UTF-8')

    def _put_string(self, s):
        self._put_number1(len(s))
        d = struct.pack('%is' % len(s), s.encode('UTF-8'))
        self.struct_data += d

    def _put_number1(self, nr):
        d = struct.pack('>b', nr)
        self.struct_data += d

    def _put_number2(self, nr):
        d = struct.pack('>H', nr)
        self.struct_data += d

    def _put_number4(self, nr):
        d = struct.pack('>I', nr)
        self.struct_data += d

    def _put_number8(self, nr):
        d = struct.pack('>Q', nr)
        self.struct_data += d

    def _put_long_string(self, s):
        self._put_number4(len(s))
        d = struct.pack('%is' % len(s), s.encode('UTF-8'))
        self.struct_data += d

    def unpack_hello(self):
        """unpack a zre hello packet

        sequence      number 2
        endpoint      string
        groups        strings
        status        number 1
        name          string
        headers       dictionary
        """
        #self._needle = 0
        self.sequence = self._get_number2()
        #print(self.sequence)
        #print("needle is at: %i"% self._needle )
        self.endpoint = self._get_string()
        #print(self.ipaddress)
        #print("needle is at: %i"% self._needle )
        group_len = self._get_number4()
        #print("needle is at: %i"% self._needle )
        #print("grouplen: ", group_len)
        self.groups = []
        for x in range(group_len):
            self.groups.append(self._get_long_string())
        #print(self.groups)
        #print("post_group: needle is at: %i"% self._needle )
        self.status = self._get_number1()
        self.name = self._get_string()
        headers_len = self._get_number4()
        self.headers = {}
        for x in range(headers_len):
            key = self._get_string()
            val = self._get_long_string()
            self.headers.update({key: val})
            #import ast
            #for hdr in hdrlist:
            #    # TODO: safer to use ast.literal_eval
            #    headers.update(ast.literal_eval(hdr))
        #print(self.headers)

    def pack_hello(self):
        """Pack a zre hello packet

        sequence      number 2
        endpoint      string
        groups        strings
        status        number 1
        name          string
        headers       dictionary
        """
        # clear data
        #self.struct_data = b''
        #print(len(self.struct_data))
        #self._put_number2(0xAAA0)
        #self._put_number1(self.id)
        self._put_number2(self.sequence)
        self._put_string(self.endpoint)
        self._put_number4(len(self.groups))
        for g in self.groups:
            self._put_long_string(g)
        self._put_number1(self.status)
        self._put_string(self.name)
        self._put_number4(len(self.headers))
        for key, val in self.headers.items():
            self._put_string(key)
            self._put_long_string(val)

if __name__ == '__main__':
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)
    self._put_long_string("%s=%s" % (key, val))

    testdata = struct.pack('>Hb9sII2sI2sI2sbb4sIb1sI1sb1sI1s',
                           11,         # sequence
                           9,          # str length
                           b"192:20123",   # endpoint
                           20123,      # mailbox
                           3,          # groups len
                           2, b"g1",   # length + groupname
                           2, b"g2",   # length + groupname
                           2, b"g3",   # length + groupname
                           4,          # status
                           4, b"NAME", # name
                           2,          # header len
                           1, b"a",    # length + dict
                           1, b"z",    # length + dict
                           1, b"b",    # length + dict
                           1, b"b"     # length + dict
                    )

    logger.debug("New ZRE HELLO message")
    m = ZreMsg(ZreMsg.HELLO, data=testdata)

    logger.debug("Unpack a HELLO message")
    m.unpack_hello()

    logger.debug("Pack a HELLO message")
    m.pack_hello()

    logger.debug("Unpack the packed HELLO message")
    m.unpack_hello()