Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / rook   python

Repository URL to install this package:

Version: 0.1.176 

/ com_ws / agent_com_impel / singlethread_agent_com.py

import time
import websocket
import socket

from rook.com_ws import poll_select, selectable_event, selectable_queue
from rook.com_ws.agent_com_impel.agent_com_base import AgentComBase, FlushMessagesEvent
from rook.config import AgentComConfiguration

from rook.logger import logger
import rook.protobuf.envelope_pb2 as envelope_pb
from six.moves.queue import Empty


class SingleThreadAgentCom(AgentComBase):

    def __init__(self, agent_id, host, port, proxy, token, labels, tags, debug, print_on_initial_connection):
        super(SingleThreadAgentCom, self).__init__(agent_id, host, port, proxy, token, labels, tags, debug,
                                                   print_on_initial_connection)

        # Queue hold outgoing messages wrapped with IEnvelopeWrapper
        self._queue = selectable_queue.SelectableQueue()

    def await_message(self, message_name):
        event = selectable_event.SelectableEvent()
        self.once(message_name, lambda _: event.set())

        return event

    def _create_run_connection_thread(self):
        with self.await_message('InitialAugsCommand') as got_initial_augs_event:
            try:
                self.run_until_stopped(got_initial_augs_event)
            except Exception as exc:
                logger.exception("network loop stopped: %s", exc)

    def run_until_stopped(self, got_initial_augs_event):
        self._connection.ping()
        waiter = poll_select.Waiter([self._connection, self._queue, got_initial_augs_event], [], [self._connection])
        last_ping_time = 0
        last_read_time = time.time()
        got_initial_augs_started_waiting = time.time()
        got_initial_augs_keep_waiting = True
        while self._running:
            # this is similar to the select API: rlist, wlist, xlist - fds ready to read, ready to write, and errors
            # see official documentation for POSIX select or Python select.select for further info
            rlist, _, xlist = waiter.wait(AgentComConfiguration.PING_INTERVAL)

            # if it's time to send a ping, go ahead and do it now
            if (time.time() - last_ping_time) >= AgentComConfiguration.PING_INTERVAL:
                last_ping_time = time.time()
                logger.debug("Sending ping")
                self._connection.ping()
            # if rlist and xlist are empty -> the wait timed out, so check if we had a ping timeout
            if len(rlist) == 0 and len(xlist) == 0 and (
                    time.time() - last_read_time) >= AgentComConfiguration.PING_TIMEOUT:
                logger.debug("Disconnecting due to ping timeout")
                self._connection.close()
                break

            # got initial augs is ready, so don't wait on it anymore
            if got_initial_augs_event in rlist:
                # don't wait on got_initial_augs_event anymore
                got_initial_augs_keep_waiting = False
                waiter = poll_select.Waiter([self._connection, self._queue], [], [self._connection])
                logger.info("Finished initialization")
            # still waiting for got initial augs, but reached timeout, don't wait on it anymore
            if got_initial_augs_keep_waiting and (
                    time.time() - got_initial_augs_started_waiting) >= AgentComConfiguration.TIMEOUT:
                got_initial_augs_keep_waiting = False
                waiter = poll_select.Waiter([self._connection, self._queue], [], [self._connection])
                logger.warning("Timeout waiting for initial augs")
            # connection appeared in xlist, means it was closed
            if self._connection in xlist:
                logger.info("Connection closed")
                break
            # connection appeared in rlist, means there's data to read
            if self._connection in rlist:
                last_read_time = time.time()
                try:
                    code, msg = self._connection.recv_data(control_frame=True)
                    if code == websocket.ABNF.OPCODE_BINARY:
                        if msg is None:
                            # socket disconnected
                            logger.debug("Reading msg - socket disconnected")
                            break

                        if len(msg) > AgentComConfiguration.AGENT_COM_INCOMING_MAX_MESSAGE_SIZE:
                            logger.error("message length (%d) exceed max size", msg)
                            continue

                        envelope = envelope_pb.Envelope()
                        envelope.ParseFromString(msg)
                        self._handle_incoming_message(envelope)
                except (socket.error, websocket.WebSocketConnectionClosedException):
                    logger.debug("Reading msg - socket disconnected")
                    break
            # queue appeared in rlist, means there's a new message to send
            if self._queue in rlist:
                wrapped_envelope = None
                try:
                    wrapped_envelope = self._queue.get()
                    msg = wrapped_envelope.get_buffer()
                    if isinstance(msg, FlushMessagesEvent):
                        msg.event.set()
                        continue
                    self._connection.send_binary(msg)
                except (socket.error, websocket.WebSocketConnectionClosedException):
                    if wrapped_envelope is not None:
                        self._queue.put(wrapped_envelope)
                    logger.info("Got websocket closed exception")
                    break
                except Empty:
                    continue
                self._queue_messages_length - len(wrapped_envelope)

        logger.warning("loop stopped running")