Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# This file is part of Scapy
# See http://www.secdev.org/projects/scapy for more information
# Copyright (C) Philippe Biondi <phil@secdev.org>
# This program is published under a GPLv2 license

"""
Sessions: decode flow of packets when sniffing
"""

from collections import defaultdict
from scapy.compat import raw
from scapy.config import conf
from scapy.packet import NoPayload, Packet
from scapy.plist import PacketList

# Typing imports
from scapy.compat import (
    Any,
    Callable,
    DefaultDict,
    Dict,
    List,
    Optional,
    Tuple,
    cast
)


class DefaultSession(object):
    """Default session: no stream decoding"""

    def __init__(
            self,
            prn=None,  # type: Optional[Callable[[Packet], Any]]
            store=False,  # type: bool
            supersession=None,  # type: Optional[DefaultSession]
            *args,  # type: Any
            **karg  # type: Any
    ):
        # type: (...) -> None
        self.__prn = prn
        self.__store = store
        self.lst = []  # type: List[Packet]
        self.__count = 0
        self._supersession = supersession
        if self._supersession:
            self._supersession.prn = self.__prn
            self._supersession.store = self.__store
            self.__store = False
            self.__prn = None

    @property
    def store(self):
        # type: () -> bool
        return self.__store

    @store.setter
    def store(self, val):
        # type: (bool) -> None
        if self._supersession:
            self._supersession.store = val
        else:
            self.__store = val

    @property
    def prn(self):
        # type: () -> Optional[Callable[[Packet], Any]]
        return self.__prn

    @prn.setter
    def prn(self, f):
        # type: (Optional[Any]) -> None
        if self._supersession:
            self._supersession.prn = f
        else:
            self.__prn = f

    @property
    def count(self):
        # type: () -> int
        if self._supersession:
            return self._supersession.count
        else:
            return self.__count

    def toPacketList(self):
        # type: () -> PacketList
        if self._supersession:
            return PacketList(self._supersession.lst, "Sniffed")
        else:
            return PacketList(self.lst, "Sniffed")

    def on_packet_received(self, pkt):
        # type: (Optional[Packet]) -> None
        """DEV: entry point. Will be called by sniff() for each
        received packet (that passes the filters).
        """
        if not pkt:
            return
        if isinstance(pkt, list):
            for p in pkt:
                DefaultSession.on_packet_received(self, p)
            return
        self.__count += 1
        if self.store:
            self.lst.append(pkt)
        if self.prn:
            result = self.prn(pkt)
            if result is not None:
                print(result)


class IPSession(DefaultSession):
    """Defragment IP packets 'on-the-flow'.

    Usage:
    >>> sniff(session=IPSession)
    """

    def __init__(self, *args, **kwargs):
        # type: (*Any, **Any) -> None
        DefaultSession.__init__(self, *args, **kwargs)
        self.fragments = defaultdict(list)  # type: DefaultDict[Tuple[Any, ...], List[Packet]]  # noqa: E501

    def _ip_process_packet(self, packet):
        # type: (Packet) -> Optional[Packet]
        from scapy.layers.inet import _defrag_list, IP
        if IP not in packet:
            return packet
        ip = packet[IP]
        packet._defrag_pos = 0
        if ip.frag != 0 or ip.flags.MF:
            uniq = (ip.id, ip.src, ip.dst, ip.proto)
            self.fragments[uniq].append(packet)
            if not ip.flags.MF:  # end of frag
                try:
                    if self.fragments[uniq][0].frag == 0:
                        # Has first fragment (otherwise ignore)
                        defrag = []  # type: List[Packet]
                        _defrag_list(self.fragments[uniq], defrag, [])
                        defragmented_packet = defrag[0]
                        defragmented_packet = defragmented_packet.__class__(
                            raw(defragmented_packet)
                        )
                        return defragmented_packet
                finally:
                    del self.fragments[uniq]
            return None
        else:
            return packet

    def on_packet_received(self, pkt):
        # type: (Optional[Packet]) -> None
        if not pkt:
            return None
        DefaultSession.on_packet_received(
            self,
            self._ip_process_packet(pkt)
        )


class StringBuffer(object):
    """StringBuffer is an object used to re-order data received during
    a TCP transmission.

    Each TCP fragment contains a sequence number, which marks
    (relatively to the first sequence number) the index of the data contained
    in the fragment.

    If a TCP fragment is missed, this class will fill the missing space with
    zeros.
    """
    def __init__(self):
        # type: () -> None
        self.content = bytearray(b"")
        self.content_len = 0
        self.incomplete = []  # type: List[Tuple[int, int]]

    def append(self, data, seq):
        # type: (bytes, int) -> None
        data_len = len(data)
        seq = seq - 1
        if seq + data_len > self.content_len:
            self.content += b"\x00" * (seq - self.content_len + data_len)
            # If data was missing, mark it.
            self.incomplete.append((self.content_len, seq))
            self.content_len = seq + data_len
            assert len(self.content) == self.content_len
        # XXX removes empty space marker.
        # for ifrag in self.incomplete:
        #     if [???]:
        #         self.incomplete.remove([???])
        memoryview(self.content)[seq:seq + data_len] = data  # type: ignore

    def full(self):
        # type: () -> bool
        # Should only be true when all missing data was filled up,
        # (or there never was missing data)
        return True  # XXX

    def clear(self):
        # type: () -> None
        self.__init__()  # type: ignore

    def __bool__(self):
        # type: () -> bool
        return bool(self.content_len)
    __nonzero__ = __bool__

    def __len__(self):
        # type: () -> int
        return self.content_len

    def __bytes__(self):
        # type: () -> bytes
        return bytes(self.content)

    def __str__(self):
        # type: () -> str
        return cast(str, self.__bytes__())


class TCPSession(IPSession):
    """A Session that matches seq/ack packets together to dissect
    special protocols, such as HTTP.

    DEV: implement a class-function `tcp_reassemble` in your Packet class::

        @classmethod
        def tcp_reassemble(cls, data, metadata):
            # data = the reassembled data from the same request/flow
            # metadata = empty dictionary, that can be used to store data
            [...]
            # If the packet is available, return it. Otherwise don't.
            # Whenever you return a packet, the buffer will be discarded.
            return pkt
            # Otherwise, maybe store stuff in metadata, and return None,
            # as you need additional data.
            return None

    For more details and a real example, see:
    https://scapy.readthedocs.io/en/latest/usage.html#how-to-use-tcpsession-to-defragment-tcp-packets

    :param app: Whether the socket is on application layer = has no TCP
                layer. This is used for instance if you are using a native
                TCP socket. Default to False
    """

    fmt = ('TCP {IP:%IP.src%}{IPv6:%IPv6.src%}:%r,TCP.sport% > ' +
           '{IP:%IP.dst%}{IPv6:%IPv6.dst%}:%r,TCP.dport%')

    def __init__(self, app=False, *args, **kwargs):
        # type: (bool, *Any, **Any) -> None
        super(TCPSession, self).__init__(*args, **kwargs)
        self.app = app
        if app:
            self.data = b""
            self.metadata = {}  # type: Dict[str, Any]
        else:
            # The StringBuffer() is used to build a global
            # string from fragments and their seq nulber
            self.tcp_frags = defaultdict(
                lambda: (StringBuffer(), {})
            )  # type: DefaultDict[str, Tuple[StringBuffer, Dict[str, Any]]]

    def _process_packet(self, pkt):
        # type: (Packet) -> Optional[Packet]
        """Process each packet: matches the TCP seq/ack numbers
        to follow the TCP streams, and orders the fragments.
        """
        if self.app:
            # Special mode: Application layer. Use on top of TCP
            pay_class = pkt.__class__
            if not hasattr(pay_class, "tcp_reassemble"):
                # Being on top of TCP, we have no way of knowing
                # when a packet ends.
                return pkt
            self.data += bytes(pkt)
            pkt = pay_class.tcp_reassemble(self.data, self.metadata)
            if pkt:
                self.data = b""
                self.metadata = {}
                return pkt
            return None

        from scapy.layers.inet import IP, TCP
        if not pkt or TCP not in pkt:
            return pkt
        pay = pkt[TCP].payload
        if isinstance(pay, (NoPayload, conf.padding_layer)):
            return pkt
        new_data = pay.original
        # Match packets by a uniqute TCP identifier
        seq = pkt[TCP].seq
        ident = pkt.sprintf(self.fmt)
        data, metadata = self.tcp_frags[ident]
        # Let's guess which class is going to be used
        if "pay_class" not in metadata:
            pay_class = pay.__class__
            if hasattr(pay_class, "tcp_reassemble"):
                tcp_reassemble = pay_class.tcp_reassemble
            else:
                # We can't know for sure when a packet ends.
                # Ignore.
                return pkt
            metadata["pay_class"] = pay_class
            metadata["tcp_reassemble"] = tcp_reassemble
        else:
            tcp_reassemble = metadata["tcp_reassemble"]
        # Get a relative sequence number for a storage purpose
        relative_seq = metadata.get("relative_seq", None)
        if relative_seq is None:
            relative_seq = metadata["relative_seq"] = seq - 1
        seq = seq - relative_seq
        # Add the data to the buffer
        # Note that this take care of retransmission packets.
        data.append(new_data, seq)
        # Check TCP FIN or TCP RESET
        if pkt[TCP].flags.F or pkt[TCP].flags.R:
            metadata["tcp_end"] = True

        # In case any app layer protocol requires it,
        # allow the parser to inspect TCP PSH flag
        if pkt[TCP].flags.P:
            metadata["tcp_psh"] = True
        # XXX TODO: check that no empty space is missing in the buffer.
        # XXX Currently, if a TCP fragment was missing, we won't notice it.
        packet = None  # type: Optional[Packet]
        if data.full():
            # Reassemble using all previous packets
            packet = tcp_reassemble(bytes(data), metadata)
        # Stack the result on top of the previous frames
        if packet:
            data.clear()
            metadata.clear()
            del self.tcp_frags[ident]
            pay.underlayer.remove_payload()
            if IP in pkt:
                pkt[IP].len = None
                pkt[IP].chksum = None
            return pkt / packet
        return None

    def on_packet_received(self, pkt):
        # type: (Optional[Packet]) -> None
        """Hook to the Sessions API: entry point of the dissection.
        This will defragment IP if necessary, then process to
        TCP reassembly.
        """
        if not pkt:
            return None
        # First, defragment IP if necessary
        pkt = self._ip_process_packet(pkt)
        if not pkt:
            return None
        # Now handle TCP reassembly
        pkt = self._process_packet(pkt)
        DefaultSession.on_packet_received(self, pkt)