Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flet / messaging / session.py
Size: Mime:
import asyncio
import logging
import traceback
import weakref
from datetime import datetime, timedelta, timezone
from typing import Any, Optional

from flet.controls.base_control import BaseControl
from flet.controls.control import Control
from flet.controls.control_event import ControlEvent
from flet.controls.object_patch import ObjectPatch
from flet.controls.page import Page, _session_page
from flet.controls.update_behavior import UpdateBehavior
from flet.messaging.connection import Connection
from flet.messaging.protocol import (
    ClientAction,
    ClientMessage,
    InvokeMethodRequestBody,
    PatchControlBody,
    SessionCrashedBody,
)
from flet.pubsub.pubsub_client import PubSubClient
from flet.utils.from_dict import from_dict
from flet.utils.patch_dataclass import patch_dataclass
from flet.utils.strings import random_string

logger = logging.getLogger("flet")

__all__ = ["Session"]


class Session:
    def __init__(self, conn: Connection):
        self.__conn = conn
        self.__id = random_string(16)
        self.__expires_at = None
        self.__index: weakref.WeakValueDictionary[int, Control] = (
            weakref.WeakValueDictionary()
        )
        self.__page = Page(self)
        self.__index[self.__page._i] = self.__page
        self.__pubsub_client = PubSubClient(conn.pubsubhub, self.__id)

        session_id = self.__id
        weakref.finalize(
            self, lambda: logger.debug(f"Session was garbage collected: {session_id}")
        )

    @property
    def connection(self) -> Connection:
        return self.__conn

    @property
    def id(self):
        return self.__id

    @property
    def expires_at(self) -> Optional[datetime]:
        return self.__expires_at

    @property
    def index(self):
        return self.__index

    @property
    def page(self):
        return self.__page

    @property
    def pubsub_client(self) -> PubSubClient:
        return self.__pubsub_client

    async def connect(self, conn: Connection) -> None:
        logger.debug(f"Connect session: {self.id}")
        _session_page.set(self.__page)
        self.__conn = conn
        self.__expires_at = None
        UpdateBehavior.disable_auto_update()
        await self.dispatch_event(self.__page._i, "connect", None)

    async def disconnect(self, session_timeout_seconds: int) -> None:
        logger.debug(f"Disconnect session: {self.id}")
        self.__expires_at = datetime.now(timezone.utc) + timedelta(
            seconds=session_timeout_seconds
        )
        if self.__conn:
            self.__conn.dispose()
            self.__conn = None
        UpdateBehavior.disable_auto_update()
        await self.dispatch_event(self.__page._i, "disconnect", None)

    def close(self):
        logger.debug(f"Closing expired session: {self.id}")
        self.__pubsub_client.unsubscribe_all()

    def patch_control(self, control: BaseControl):
        patch, added_controls, removed_controls = self.__get_update_control_patch(
            control=control, prev_control=control
        )
        if patch:
            for removed_control in removed_controls:
                removed_control.will_unmount()
                self.__index.pop(removed_control._i, None)

            self.connection.send_message(
                ClientMessage(
                    ClientAction.PATCH_CONTROL, PatchControlBody(control._i, patch)
                )
            )

            for added_control in added_controls:
                self.__index[added_control._i] = added_control
                added_control.did_mount()

    def apply_patch(self, control_id: int, patch: dict[str, Any]):
        if control := self.__index.get(control_id):
            patch_dataclass(control, patch)

    def apply_page_patch(self, patch: dict[str, Any]):
        self.apply_patch(self.__page._i, patch)

    def get_page_patch(self):
        patch, added_controls, _ = self.__get_update_control_patch(
            self.__page, prev_control=None
        )

        for added_control in added_controls:
            self.__index[added_control._i] = added_control
            added_control.did_mount()

        return patch[""]

    # optimizations:
    # - disable auto-update
    # - auto-update to skip already updated items
    # - add-only list
    # - disable mount/unmount

    async def dispatch_event(
        self,
        control_id: int,
        event_name: str,
        event_data: Any,
    ):
        control = self.__index.get(control_id)
        if not control:
            # control not found
            return

        field_name = f"on_{event_name}"
        if not hasattr(control, field_name):
            # field_name not defined
            return
        try:
            event_type = ControlEvent.get_event_field_type(control, field_name)
            if event_type is None:
                return

            if event_type == ControlEvent or not isinstance(event_data, dict):
                # simple ControlEvent
                e = ControlEvent(control=control, name=event_name, data=event_data)
            else:
                # custom ControlEvent
                args = {
                    "control": control,
                    "name": event_name,
                    **(event_data or {}),
                }
                e = from_dict(event_type, args)

            UpdateBehavior.reset()

            # Handle async and sync event handlers accordingly
            event_handler = getattr(control, field_name)
            if asyncio.iscoroutinefunction(event_handler):
                await event_handler(e)
            elif callable(event_handler):
                event_handler(e)

            if UpdateBehavior.auto_update_enabled():
                self.auto_update(control)
        except Exception as ex:
            tb = traceback.format_exc()
            self.error(f"Exception in '{field_name}': {ex}\n{tb}")

    def invoke_method(self, control_id: int, call_id: str, method_name: str, args: Any):
        self.connection.send_message(
            ClientMessage(
                ClientAction.INVOKE_METHOD,
                InvokeMethodRequestBody(
                    control_id=control_id, call_id=call_id, name=method_name, args=args
                ),
            )
        )

    def handle_invoke_method_results(
        self, control_id: int, call_id: str, result: Any, error: Optional[str]
    ):
        if control := self.__index.get(control_id):
            control._handle_invoke_method_results(
                call_id=call_id, result=result, error=error
            )
        else:
            raise Exception(
                f"Error handling invoke method results. Control with ID {control_id} "
                "is not registered."
            )

    def auto_update(self, control: BaseControl):
        while control:
            if control.is_isolated():
                control.update()
                break
            control = control.parent

    def error(self, message: str):
        self.connection.send_message(
            ClientMessage(ClientAction.SESSION_CRASHED, SessionCrashedBody(message))
        )

    def __get_update_control_patch(
        self, control: BaseControl, prev_control: Optional[BaseControl]
    ):
        # calculate patch
        patch, added_controls, removed_controls = ObjectPatch.from_diff(
            prev_control,
            control,
            in_place=True,
            control_cls=BaseControl,
        )

        # print("\n\npatch:", patch)
        # print(f"\n\nadded_controls: ({len(added_controls)})")
        # for ac in added_controls:
        #     print(f"added_control: {ac._c}({ac._i})")

        # print(f"\n\nremoved_controls: ({len(removed_controls)})")
        # for c in removed_controls:
        #     print(f"removed_control: {c._c}({c._i})")

        return patch.to_graph(), added_controls, removed_controls