Repository URL to install this package:
|
Version:
2.32.4 ▾
|
import sys
import time
from . import exceptions
from . import packet
from . import payload
class Socket(object):
"""An Engine.IO socket."""
upgrade_protocols = ["websocket"]
def __init__(self, server, sid):
self.server = server
self.sid = sid
self.queue = self.server.create_queue()
self.last_ping = None
self.connected = False
self.upgrading = False
self.upgraded = False
self.closing = False
self.closed = False
self.session = {}
def poll(self):
"""Wait for packets to send to the client."""
queue_empty = self.server.get_queue_empty_exception()
try:
packets = [
self.queue.get(
timeout=self.server.ping_interval + self.server.ping_timeout
)
]
self.queue.task_done()
except queue_empty:
raise exceptions.QueueEmpty()
if packets == [None]:
return []
while True:
try:
pkt = self.queue.get(block=False)
self.queue.task_done()
if pkt is None:
self.queue.put(None)
break
packets.append(pkt)
except queue_empty:
break
return packets
def receive(self, pkt):
"""Receive packet from the client."""
packet_name = (
packet.packet_names[pkt.packet_type]
if pkt.packet_type < len(packet.packet_names)
else "UNKNOWN"
)
self.server.logger.info(
"%s: Received packet %s data %s",
self.sid,
packet_name,
pkt.data if not isinstance(pkt.data, bytes) else "<binary>",
)
if pkt.packet_type == packet.PONG:
self.schedule_ping()
elif pkt.packet_type == packet.MESSAGE:
self.server._trigger_event(
"message", self.sid, pkt.data, run_async=self.server.async_handlers
)
elif pkt.packet_type == packet.UPGRADE:
self.send(packet.Packet(packet.NOOP))
elif pkt.packet_type == packet.CLOSE:
self.close(wait=False, abort=True)
else:
raise exceptions.UnknownPacketError()
def check_ping_timeout(self):
"""Make sure the client is still responding to pings."""
if self.closed:
raise exceptions.SocketIsClosedError()
if self.last_ping and time.time() - self.last_ping > self.server.ping_timeout:
self.server.logger.info("%s: Client is gone, closing socket", self.sid)
# Passing abort=False here will cause close() to write a
# CLOSE packet. This has the effect of updating half-open sockets
# to their correct state of disconnected
self.close(wait=False, abort=False)
return False
return True
def send(self, pkt):
"""Send a packet to the client."""
if not self.check_ping_timeout():
return
else:
self.queue.put(pkt)
self.server.logger.info(
"%s: Sending packet %s data %s",
self.sid,
packet.packet_names[pkt.packet_type],
pkt.data if not isinstance(pkt.data, bytes) else "<binary>",
)
def handle_get_request(self, environ, start_response):
"""Handle a long-polling GET request from the client."""
connections = [
s.strip() for s in environ.get("HTTP_CONNECTION", "").lower().split(",")
]
transport = environ.get("HTTP_UPGRADE", "").lower()
if "upgrade" in connections and transport in self.upgrade_protocols:
self.server.logger.info(
"%s: Received request to upgrade to %s", self.sid, transport
)
return getattr(self, "_upgrade_" + transport)(environ, start_response)
if self.upgrading or self.upgraded:
# we are upgrading to WebSocket, do not return any more packets
# through the polling endpoint
return [packet.Packet(packet.NOOP)]
try:
packets = self.poll()
except exceptions.QueueEmpty:
exc = sys.exc_info()
self.close(wait=False)
raise exc[1].with_traceback(exc[2])
return packets
def handle_post_request(self, environ):
"""Handle a long-polling POST request from the client."""
length = int(environ.get("CONTENT_LENGTH", "0"))
if length > self.server.max_http_buffer_size:
raise exceptions.ContentTooLongError()
else:
body = environ["wsgi.input"].read(length).decode("utf-8")
p = payload.Payload(encoded_payload=body)
for pkt in p.packets:
self.receive(pkt)
def close(self, wait=True, abort=False):
"""Close the socket connection."""
if not self.closed and not self.closing:
self.closing = True
self.server._trigger_event("disconnect", self.sid, run_async=False)
if not abort:
self.send(packet.Packet(packet.CLOSE))
self.closed = True
self.queue.put(None)
if wait:
self.queue.join()
def schedule_ping(self):
def send_ping():
self.last_ping = None
self.server.sleep(self.server.ping_interval)
if not self.closing and not self.closed:
self.last_ping = time.time()
self.send(packet.Packet(packet.PING))
self.server.start_background_task(send_ping)
def _upgrade_websocket(self, environ, start_response):
"""Upgrade the connection from polling to websocket."""
if self.upgraded:
raise IOError("Socket has been upgraded already")
if self.server._async["websocket"] is None:
# the selected async mode does not support websocket
return self.server._bad_request()
ws = self.server._async["websocket"](self._websocket_handler)
return ws(environ, start_response)
def _websocket_handler(self, ws):
"""Engine.IO handler for websocket transport."""
def websocket_wait():
data = ws.wait()
if data and len(data) > self.server.max_http_buffer_size:
raise ValueError("packet is too large")
return data
# try to set a socket timeout matching the configured ping interval
# and timeout
for attr in ["_sock", "socket"]: # pragma: no cover
if hasattr(ws, attr) and hasattr(getattr(ws, attr), "settimeout"):
getattr(ws, attr).settimeout(
self.server.ping_interval + self.server.ping_timeout
)
if self.connected:
# the socket was already connected, so this is an upgrade
self.upgrading = True # hold packet sends during the upgrade
pkt = websocket_wait()
decoded_pkt = packet.Packet(encoded_packet=pkt)
if decoded_pkt.packet_type != packet.PING or decoded_pkt.data != "probe":
self.server.logger.info(
"%s: Failed websocket upgrade, no PING packet", self.sid
)
self.upgrading = False
return []
ws.send(packet.Packet(packet.PONG, data="probe").encode())
self.queue.put(packet.Packet(packet.NOOP)) # end poll
pkt = websocket_wait()
decoded_pkt = packet.Packet(encoded_packet=pkt)
if decoded_pkt.packet_type != packet.UPGRADE:
self.upgraded = False
self.server.logger.info(
(
"%s: Failed websocket upgrade, expected UPGRADE packet, "
"received %s instead."
),
self.sid,
pkt,
)
self.upgrading = False
return []
self.upgraded = True
self.upgrading = False
else:
self.connected = True
self.upgraded = True
# start separate writer thread
def writer():
while True:
packets = None
try:
packets = self.poll()
except exceptions.QueueEmpty:
break
if not packets:
# empty packet list returned -> connection closed
break
try:
for pkt in packets:
ws.send(pkt.encode())
except Exception:
break
writer_task = self.server.start_background_task(writer)
self.server.logger.info("%s: Upgrade to websocket successful", self.sid)
while True:
p = None
try:
p = websocket_wait()
except Exception as e:
# if the socket is already closed, we can assume this is a
# downstream error of that
if not self.closed: # pragma: no cover
self.server.logger.info(
'%s: Unexpected error "%s", closing connection',
self.sid,
str(e),
)
break
if p is None:
# connection closed by client
break
pkt = packet.Packet(encoded_packet=p)
try:
self.receive(pkt)
except exceptions.UnknownPacketError: # pragma: no cover
pass
except exceptions.SocketIsClosedError: # pragma: no cover
self.server.logger.info("Receive error -- socket is closed")
break
except Exception:
# if we get an unexpected exception we log the error and exit
# the connection properly
self.server.logger.exception("Unknown receive error")
break
self.queue.put(None) # unlock the writer task so that it can exit
writer_task.join()
self.close(wait=False, abort=True)
return []