Repository URL to install this package:
|
Version:
0.7 ▾
|
erlport
/
erlproto.py
|
|---|
# Copyright (c) 2009, 2010, Dmitry Vasiliev <dima@hlabs.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""Erlang port protocol."""
__author__ = "Dmitry Vasiliev <dima@hlabs.org>"
"""Prototype modification by Garrison Venn <garrison.venn@stem.com"""
import os
import sys
import errno
import select
#import logging
import traceback
from struct import pack, unpack
from erlport.erlterms import Atom, encode, decode
class Protocol(object):
"""Erlang port protocol."""
def __init__(self, use_read_select=False, select_timeout=0):
self._use_read_select = use_read_select
self._select_timeout = select_timeout
def run(self, port, once=False):
"""Run processing loop."""
loop_it = True
read_fd = port.in_d if self._use_read_select else None
while loop_it:
try:
message = self.read(port, read_fd)
except EOFError:
raise
if message != None:
self.handle(port, message)
if once:
loop_it = False
def read(self, port, read_fd):
'''Reads with optional select behavior
'''
do_read = True
if read_fd != None:
# 0 select_timeout with no return will result in rapid
# infinite iteration.
#
while (True):
try:
r_triggered, _, _ = \
select.select([read_fd], [], [], self._select_timeout)
except select.error as ex:
if ex[0] != errno.EINTR:
raise
#except OSError as e:
# if e.errno != errno.EINTR:
# raise
else:
if not r_triggered:
do_read = False
break
return(port.read() if do_read else None)
def handle(self, port, message):
"""Handle incoming message."""
if not (isinstance(message, Atom)
or isinstance(message, tuple) and len(message) > 0):
response = Atom("error"), Atom("badarg")
else:
if isinstance(message, Atom):
name = message
args = ()
else:
name = message[0]
args = message[1:]
if not isinstance(name, Atom):
response = Atom("error"), Atom("badarg")
else:
handler = getattr(self, "handle_%s" % name, None)
if handler is None:
response = Atom("error"), Atom("undef")
else:
try:
response = handler(*args)
except:
response = self._handle_error(sys.exc_info())
port.write(response)
def _handle_error(self, exception):
t, val, tb = exception
exc = Atom("%s.%s" % (t.__module__, t.__name__))
exc_tb = traceback.extract_tb(tb)
exc_tb.reverse()
return Atom("error"), (Atom("exception"), (exc, unicode(val), exc_tb))
class Port(object):
"""Erlang port."""
_formats = {
1: "B",
2: ">H",
4: ">I",
}
def __init__(self, packet=1, use_stdio=False, compressed=False,
descriptors=None):
self._format = self._formats.get(packet)
if self._format is None:
raise ValueError("invalid packet size value: %s" % packet)
self.packet = packet
self.compressed = compressed
if descriptors is not None:
self.in_d, self.out_d = descriptors
elif use_stdio:
self.in_d, self.out_d = 0, 1
else:
self.in_d, self.out_d = 3, 4
def _read_data(self, length):
data = ""
while length > 0:
# GDEBUG
#logging.info("%s::%s: length = %s, fd = %s.",
# self.__class__.__name__,
# "_read_data(...)",
# repr(length),
# repr(self.in_d))
try:
buf = os.read(self.in_d, length)
except OSError, why:
if why.errno == errno.EPIPE:
raise EOFError()
raise
if not buf:
raise EOFError()
data += buf
length -= len(buf)
return data
def read(self):
"""Read incoming message."""
data = self._read_data(self.packet)
length, = unpack(self._format, data)
data = self._read_data(length)
return decode(data)[0]
def write(self, message):
"""Write outgoing message."""
data = encode(message, compressed=self.compressed)
data = pack(self._format, len(data)) + data
while len(data) != 0:
# GDEBUG
#logging.info("%s::%s: message: %s, length = %s, fd = %s.",
# self.__class__.__name__,
# "write(...)",
# repr(message),
# repr(len(data)),
# repr(self.out_d))
try:
n = os.write(self.out_d, data)
except IOError, why:
if why.errno == errno.EPIPE:
raise EOFError()
raise
if n == 0:
raise EOFError()
data = data[n:]
def close(self):
"""Close port."""
os.close(self.in_d)
os.close(self.out_d)