Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ulauncher / usr / lib / python3 / dist-packages / ulauncher / api / client / Extension.py
Size: Mime:
import logging
from collections import defaultdict

from ulauncher.api.shared.Response import Response
from ulauncher.api.shared.action.BaseAction import BaseAction
from ulauncher.api.shared.event import PreferencesEvent, PreferencesUpdateEvent
from ulauncher.api.client.EventListener import EventListener
from ulauncher.api.client.Client import Client
from ulauncher.api.client.setup_logging import setup_logging


class Extension:
    """
    Manages extension runtime
    """

    def __init__(self):
        self._listeners = defaultdict(list)
        self._client = Client(self)
        self.preferences = {}
        self.logger = logging.getLogger(__name__)
        setup_logging()

    def subscribe(self, event_type, event_listener):
        """
        Example:

            extension.subscribe(PreferencesEvent, PreferencesEventListener())

        :param type event_type:
        :param ~ulauncher.api.client.EventListener.EventListener event_listener:
        """
        self._listeners[event_type].append(event_listener)

    def get_listeners_for_event(self, event):
        """
        :param ~ulauncher.api.shared.event.BaseEvent event:
        :rtype: ~ulauncher.api.client.EventListener.EventListener
        """
        return self._listeners[type(event)]

    def trigger_event(self, event):
        """
        :param ~ulauncher.api.shared.event.BaseEvent event:
        """
        listeners = self.get_listeners_for_event(event)
        if not listeners:
            self.logger.debug('No listeners for event %s', type(event).__name__)
            return

        for listener in listeners:
            action = listener.on_event(event, self)
            if action:
                assert isinstance(action, BaseAction), "on_event return value is not an instance of BaseAction"
                self._client.send(Response(event, action))

    def run(self):
        """
        Subscribes to events and connects to Ulauncher WS server
        """
        self.subscribe(PreferencesEvent, PreferencesEventListener())
        self.subscribe(PreferencesUpdateEvent, PreferencesUpdateEventListener())
        self._client.connect()


# pylint: disable=too-few-public-methods
class PreferencesEventListener(EventListener):

    def on_event(self, event, extension):
        extension.preferences.update(event.preferences)


class PreferencesUpdateEventListener(EventListener):

    def on_event(self, event, extension):
        extension.preferences[event.id] = event.new_value