Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
#!/usr/bin/env python3
# encoding: UTF-8

# This file is part of turberfield.
#
# Turberfield is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Turberfield is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with turberfield.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
import functools
import warnings

from turberfield.ipc.flow import Flow
from turberfield.ipc.message import dumps
from turberfield.ipc.message import loads
from turberfield.ipc.netstrings import dumpb
from turberfield.ipc.netstrings import loadb
from turberfield.ipc.node import TakesPolicy


class UDPAdapter(asyncio.DatagramProtocol):

    def __init__(self, loop, token, types, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.loop = loop
        self.token = token
        self.types = types
        self.decoder = loadb(encoding="utf-8")
        self.decoder.send(None)
        self.transport = None

    def hop(self, token, msg, policy):
        raise NotImplementedError

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        """
        Routing only. Provides no upward service.

        """
        packet = self.decoder.send(data)
        if packet is None:
            return (None, None)

        msg = loads(packet, types=self.types)
        poa, msg = self.hop(self.token, msg, policy="udp")
        if poa is not None:
            remote_addr = (poa.addr, poa.port)
            data = "\n".join(dumps(msg))
            packet = dumpb(data)
            self.transport.sendto(packet, remote_addr)
        return (poa, msg)

    def error_received(self, exc):
        warnings.warn(exc)

    def connection_lost(self, exc):
        warnings.warn("Socket closed.")

class UDPService(UDPAdapter, TakesPolicy):

    def __init__(self, loop, token, types, down=None, up=None, *args, **kwargs):
        super().__init__(loop, token, types, *args, **kwargs)
        self.down = down
        self.up = up

    def datagram_received(self, data, addr):
        """
        Extends routing function to serve messages upward.

        """
        poa, msg = super().datagram_received(data, addr)
        if msg is not None:
            self.loop.call_soon_threadsafe(functools.partial(self.up.put_nowait, msg))

    @asyncio.coroutine
    def __call__(self, token=None):
        token = token or self.token
        while True:
            try:
                job = yield from self.down.get()

                poa, msg = self.hop(token, job, policy="udp")
                if job.header.via is not None:
                    # User-defined route
                    hop = next(Flow.find(
                        token,
                        application=job.header.via.application,
                        policy="udp"),
                    None)
                    poa = Flow.inspect(hop)

                if msg is not None:
                    remote_addr = (poa.addr, poa.port)
                    data = "\n".join(dumps(msg))
                    packet = dumpb(data)
                    self.transport.sendto(packet, remote_addr)
                else:
                    warnings.warn("No message from hop.")

            except Exception as e:
                warnings.warn(e)
                continue