Repository URL to install this package:
|
Version:
0.29.0.dev4871 ▾
|
flet
/
flet_socket_server.py
|
|---|
import asyncio
import json
import logging
import os
import struct
import sys
import tempfile
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import List, Optional
import flet
from flet.core.local_connection import LocalConnection
from flet.core.protocol import (
ClientActions,
ClientMessage,
Command,
CommandEncoder,
PageCommandResponsePayload,
PageCommandsBatchResponsePayload,
RegisterWebClientRequestPayload,
)
from flet.core.pubsub.pubsub_hub import PubSubHub
from flet.utils import get_free_tcp_port, is_windows, random_string
logger = logging.getLogger(flet.__name__)
class FletSocketServer(LocalConnection):
def __init__(
self,
loop: asyncio.AbstractEventLoop,
port: int = 0,
uds_path: Optional[str] = None,
on_event=None,
on_session_created=None,
blocking=False,
executor: Optional[ThreadPoolExecutor] = None,
):
super().__init__()
self.__send_queue = asyncio.Queue()
self.__port = port
self.__uds_path = uds_path
self.__on_event = on_event
self.__on_session_created = on_session_created
self.__blocking = blocking
self.__loop = loop
self.__executor = executor
self.pubsubhub = PubSubHub(loop=loop, executor=executor)
self.__running_tasks = set()
async def start(self):
self.__connected = False
self.__receive_loop_task = None
self.__send_loop_task = None
if is_windows() or self.__port > 0:
# TCP
host = "localhost"
port = self.__port if self.__port > 0 else get_free_tcp_port()
self.page_url = f"tcp://{host}:{port}"
logger.info(f"Starting up TCP server on {host}:{port}")
server = await asyncio.start_server(self.handle_connection, host, port)
else:
# UDS
if not self.__uds_path:
self.__uds_path = str(
Path(tempfile.gettempdir()).joinpath(random_string(10))
)
if os.path.exists(self.__uds_path):
os.remove(self.__uds_path)
self.page_url = self.__uds_path
logger.info(f"Starting up UDS server on {self.__uds_path}")
server = await asyncio.start_unix_server(
self.handle_connection, self.__uds_path
)
if self.__blocking:
self.__server = None
await server.serve_forever()
else:
self.__server = asyncio.create_task(server.serve_forever())
async def handle_connection(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
if not self.__connected:
self.__connected = True
logger.debug("Connected new TCP client")
self.__receive_loop_task = asyncio.create_task(self.__receive_loop(reader))
self.__send_loop_task = asyncio.create_task(self.__send_loop(writer))
async def __receive_loop(self, reader: asyncio.StreamReader):
while True:
try:
raw_msglen = await reader.readexactly(4)
except Exception:
return None
if not raw_msglen:
return None
msglen = struct.unpack(">I", raw_msglen)[0]
data = await reader.readexactly(msglen)
await self.__on_message(data.decode("utf-8"))
async def __send_loop(self, writer: asyncio.StreamWriter):
while True:
message = await self.__send_queue.get()
try:
data = message.encode("utf-8")
msg = struct.pack(">I", len(data)) + data
writer.write(msg)
# await writer.drain()
logger.debug(f"sent to TCP: {len(msg)}")
except Exception:
# re-enqueue the message to repeat it when re-connected
self.__send_queue.put_nowait(message)
raise
async def __on_message(self, data: str):
logger.debug(f"_on_message: {data}")
msg_dict = json.loads(data)
msg = ClientMessage(**msg_dict)
task = None
if msg.action == ClientActions.REGISTER_WEB_CLIENT:
self._client_details = RegisterWebClientRequestPayload(**msg.payload)
# register response
self.__send(self._create_register_web_client_response())
# start session
if self.__on_session_created is not None:
task = asyncio.create_task(
self.__on_session_created(self._create_session_handler_arg())
)
elif msg.action == ClientActions.PAGE_EVENT_FROM_WEB:
if self.__on_event is not None:
task = asyncio.create_task(
self.__on_event(self._create_page_event_handler_arg(msg))
)
elif msg.action == ClientActions.UPDATE_CONTROL_PROPS:
if self.__on_event is not None:
task = asyncio.create_task(
self.__on_event(self._create_update_control_props_handler_arg(msg))
)
else:
# it's something else
raise Exception(f'Unknown message "{msg.action}": {msg.payload}')
if task:
self.__running_tasks.add(task)
task.add_done_callback(self.__running_tasks.discard)
def send_command(self, session_id: str, command: Command):
result, message = self._process_command(command)
if message:
self.__send(message)
return PageCommandResponsePayload(result=result, error="")
def send_commands(self, session_id: str, commands: List[Command]):
results = []
messages = []
for command in commands:
result, message = self._process_command(command)
if command.name in ["add", "get"]:
results.append(result)
if message:
messages.append(message)
if len(messages) > 0:
self.__send(ClientMessage(ClientActions.PAGE_CONTROLS_BATCH, messages))
return PageCommandsBatchResponsePayload(results=results, error="")
def __send(self, message: ClientMessage):
j = json.dumps(message, cls=CommandEncoder, separators=(",", ":"))
logger.debug(f"__send: {j}")
self.__loop.call_soon_threadsafe(self.__send_queue.put_nowait, j)
async def close(self):
logger.debug("Closing connection...")
logger.debug(f"Disconnecting all pages...")
while self.sessions:
_, page = self.sessions.popitem()
await page._disconnect(0)
if self.__executor:
logger.debug("Shutting down thread pool...")
if sys.version_info >= (3, 9):
self.__executor.shutdown(wait=False, cancel_futures=True)
else:
self.__executor.shutdown(wait=False)
# close socket
if self.__receive_loop_task:
self.__receive_loop_task.cancel()
if self.__send_loop_task:
self.__send_loop_task.cancel()
if self.__server:
self.__server.cancel()
# remove UDS path
if self.__uds_path and os.path.exists(self.__uds_path):
os.unlink(self.__uds_path)