Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-net / networking / scanner.py
Size: Mime:
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import Enum
from functools import partial
from ipaddress import ip_address
from netifaces import AF_INET, AF_LINK, ifaddresses
from typing import (
    Callable,
    Iterable,
    Iterator,
    Optional,
    Sequence,
)

from .interfaces import list_network_interfaces
from .utils import aclosing
from .wired import is_carrier_detected, is_maybe_wired_or_wireless
from .wireless import get_connected_access_point_name, is_likely_wireless

__all__ = ("NetworkScanner", "NetworkScanResult", "NetworkScanResultItem")


class NetworkInterfaceType(Enum):
    WIRED = "wired"
    WIRELESS = "wireless"


@dataclass
class NetworkScanResultItem:
    """A single item in the result of a network scan."""

    #: Name of the network interface
    interface: str

    #: The type of the interface: wired or wireless
    type: NetworkInterfaceType

    #: Whether the network interface is connected
    connected: bool = False

    #: List of IPv4 addresses, netmasks and broadcast addresses of the interface
    addresses: Sequence[dict[str, str]] = field(default_factory=list)

    #: Name of the AP that the wireless interface is connected to, if known
    access_point_name: Optional[str] = None

    @property
    def first_address_and_netmask(self) -> Optional[str]:
        """Returns the first IPv4 address and netmask of the interface in slashed
        notation, if known, `None` otherwise.
        """
        addresses = self.get_networks_and_netmasks()
        return addresses[0] if addresses else None

    def get_networks_and_netmasks(self) -> list[str]:
        """Returns the list of IPv4 addresses and netmasks of the interface in
        slashed notation.
        """
        result: list[str] = []
        for address in self.addresses:
            addr = address.get("addr") or "-"
            netmask = address.get("netmask")
            if netmask:
                result.append(f"{addr} / {netmask}")
            else:
                result.append(str(addr))
        return result

    @property
    def wireless(self) -> bool:
        """Whether the network interface is wireless."""
        return self.type is NetworkInterfaceType.WIRELESS


#: Type specification for the result of a network scan
NetworkScanResult = list[NetworkScanResultItem]


def _is_loopback(item: dict[str, str]) -> bool:
    addr = item.get("addr")
    return ip_address(addr).is_loopback if addr is not None else False


class NetworkScanner:
    """Network scanner object that runs an asynchronous task that publishes
    network scans, i.e. summaries of which wired and wireless networks the
    machine is currently connected to.
    """

    _handlers: list[Callable[[NetworkScanResult], None]]
    _last_result: Optional[NetworkScanResult]

    def __init__(self) -> None:
        """Constructor."""
        self._handlers = []
        self._last_result = None

    def add_result_handler(
        self, func: Callable[[NetworkScanResult], None]
    ) -> Callable[[], None]:
        """Registers the given function to be called whenever the network
        scanner detects a possible change in the network configuration and
        produces a new network scan result.

        Parameters:
            func: the function to register. It must accept a single argument,
                which is the new network scan result

        Returns:
            a function that can be called with no arguments to unregister the
            handler function
        """
        self._handlers.append(func)
        return partial(self._remove_result_handler, func)

    @property
    def last_result(self) -> NetworkScanResult:
        """Returns the result of the last network scan."""
        return list(self._last_result or ())

    async def run(self) -> None:
        try:
            from aio_net_events import NetworkEventDetector
            from trio import to_thread
        except ImportError:
            raise RuntimeError(
                "you need to install the 'async' extra to use this class"
            ) from None

        detector = NetworkEventDetector()
        async with aclosing(detector.events()) as network_events:
            async for _ in network_events:
                # TODO(ntamas): debounce the events!
                try:
                    result = await to_thread.run_sync(self._scan_interfaces)
                except Exception:
                    raise

                self._last_result = result

                for handler in self._handlers:
                    handler(result)

    @contextmanager
    def use_result_handler(
        self, func: Callable[[NetworkScanResult], None]
    ) -> Iterator[None]:
        """Context manager that registers the given result handler function when
        the context is entered and unregisters the given result handler function
        when the context is exited.
        """
        disposer = self.add_result_handler(func)
        try:
            yield
        finally:
            disposer()

    def _find_relevant_interfaces(
        self,
    ) -> Iterable[tuple[str, Sequence[dict[str, str]]]]:
        """Finds the list of network interfaces that might be relevant to us
        in the sense that they probably represent a wired or wireless IPv4
        interface.

        Yields:
            pairs consisting of the name of a relevant network interface and
            its IPv4 addresses (or an empty list if the interface has no IPv4
            address)
        """
        for interface in list_network_interfaces():
            addresses = ifaddresses(interface)
            ipv4_addresses = addresses.get(AF_INET)
            if not ipv4_addresses:
                # No IPv4 address. It might be a disconnected network
                # interface so we continue if we have an Ethernet address
                # but nothing else.
                if (
                    len(addresses) == 1
                    and AF_LINK in addresses
                    and is_maybe_wired_or_wireless(interface)
                ):
                    yield interface, ipv4_addresses or []
            else:
                # If any of the addresses is the loopback address, then this
                # is the loopback interface so it is not relevant
                if not any(_is_loopback(addr) for addr in ipv4_addresses):
                    yield interface, ipv4_addresses

    def _remove_result_handler(self, func: Callable[[NetworkScanResult], None]) -> None:
        try:
            self._handlers.remove(func)
        except ValueError:
            pass

    def _scan_interfaces(self) -> list[NetworkScanResultItem]:
        """Scans the network interfaces of the current device and tries to
        figure our the wired and wireless IP addresses of the device as well
        as the name of the wireless AP that the device is connected to.
        """
        result = []

        for interface, addresses in self._find_relevant_interfaces():
            if is_likely_wireless(interface):
                access_point_name = get_connected_access_point_name(interface)
                item = NetworkScanResultItem(
                    interface=interface,
                    type=NetworkInterfaceType.WIRELESS,
                    connected=access_point_name is not None,
                    access_point_name=access_point_name,
                    addresses=addresses,
                )
            else:
                item = NetworkScanResultItem(
                    interface=interface,
                    type=NetworkInterfaceType.WIRED,
                    connected=is_carrier_detected(interface),
                    addresses=addresses,
                )
            result.append(item)

        return result


def test():
    import trio

    def handler(result: NetworkScanResult):
        print("Finished network scan.")
        if result:
            print("")
            for item in result:
                print(repr(item))
                print("---")
        print("")

    async def scan():
        scanner = NetworkScanner()
        with scanner.use_result_handler(handler):
            await scanner.run()

    trio.run(scan)


if __name__ == "__main__":
    test()