Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# This file is part of Scapy
# See http://www.secdev.org/projects/scapy for more information
# Copyright (C) Philippe Biondi <phil@secdev.org>
# This program is published under a GPLv2 license

"""
General utility functions.
"""

from __future__ import absolute_import
from __future__ import print_function

from decimal import Decimal

import array
import collections
import decimal
import difflib
import gzip
import os
import random
import re
import socket
import struct
import subprocess
import sys
import tempfile
import threading
import time
import warnings

import scapy.modules.six as six
from scapy.modules.six.moves import range, input, zip_longest

from scapy.config import conf
from scapy.consts import DARWIN, OPENBSD, WINDOWS
from scapy.data import MTU, DLT_EN10MB
from scapy.compat import orb, plain_str, chb, bytes_base64,\
    base64_bytes, hex_bytes, lambda_tuple_converter, bytes_encode
from scapy.error import log_runtime, Scapy_Exception, warning
from scapy.pton_ntop import inet_pton

# Typing imports
from scapy.compat import (
    cast,
    Any,
    AnyStr,
    Callable,
    Dict,
    IO,
    Iterator,
    List,
    Literal,
    Optional,
    TYPE_CHECKING,
    Tuple,
    Type,
    Union,
    overload,
)

if TYPE_CHECKING:
    from scapy.packet import Packet
    from scapy.plist import _PacketIterable, PacketList
    from scapy.supersocket import SuperSocket
    _SuperSocket = SuperSocket
else:
    _SuperSocket = object

_ByteStream = Union[IO[bytes], gzip.GzipFile]

###########
#  Tools  #
###########


def issubtype(x,  # type: Any
              t,  # type: Union[type, str]
              ):
    # type: (...) -> bool
    """issubtype(C, B) -> bool

    Return whether C is a class and if it is a subclass of class B.
    When using a tuple as the second argument issubtype(X, (A, B, ...)),
    is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.).
    """
    if isinstance(t, str):
        return t in (z.__name__ for z in x.__bases__)
    if isinstance(x, type) and issubclass(x, t):
        return True
    return False


_Decimal = Union[Decimal, int]


class EDecimal(Decimal):
    """Extended Decimal

    This implements arithmetic and comparison with float for
    backward compatibility
    """

    def __add__(self, other, context=None):
        # type: (_Decimal, Any) -> EDecimal
        return EDecimal(Decimal.__add__(self, Decimal(other)))

    def __radd__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__add__(self, Decimal(other)))

    def __sub__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__sub__(self, Decimal(other)))

    def __rsub__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__rsub__(self, Decimal(other)))

    def __mul__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__mul__(self, Decimal(other)))

    def __rmul__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__mul__(self, Decimal(other)))

    def __truediv__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__truediv__(self, Decimal(other)))

    def __floordiv__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__floordiv__(self, Decimal(other)))

    if sys.version_info >= (3,):
        def __divmod__(self, other):
            # type: (_Decimal) -> Tuple[EDecimal, EDecimal]
            r = Decimal.__divmod__(self, Decimal(other))
            return EDecimal(r[0]), EDecimal(r[1])
    else:
        def __div__(self, other):
            # type: (_Decimal) -> EDecimal
            return EDecimal(Decimal.__div__(self, Decimal(other)))

        def __rdiv__(self, other):
            # type: (_Decimal) -> EDecimal
            return EDecimal(Decimal.__rdiv__(self, Decimal(other)))

    def __mod__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__mod__(self, Decimal(other)))

    def __rmod__(self, other):
        # type: (_Decimal) -> EDecimal
        return EDecimal(Decimal.__rmod__(self, Decimal(other)))

    def __pow__(self, other, modulo=None):
        # type: (_Decimal, Optional[_Decimal]) -> EDecimal
        return EDecimal(Decimal.__pow__(self, Decimal(other), modulo))

    def __eq__(self, other):
        # type: (Any) -> bool
        return super(EDecimal, self).__eq__(other) or float(self) == other

    def normalize(self, precision):  # type: ignore
        # type: (int) -> EDecimal
        with decimal.localcontext() as ctx:
            ctx.prec = precision
            return EDecimal(super(EDecimal, self).normalize(ctx))


@overload
def get_temp_file(keep, autoext, fd):
    # type: (bool, str, Literal[True]) -> IO[bytes]
    pass


@overload
def get_temp_file(keep=False, autoext="", fd=False):  # noqa: F811
    # type: (bool, str, Literal[False]) -> str
    pass


def get_temp_file(keep=False, autoext="", fd=False):  # noqa: F811
    # type: (bool, str, bool) -> Union[IO[bytes], str]
    """Creates a temporary file.

    :param keep: If False, automatically delete the file when Scapy exits.
    :param autoext: Suffix to add to the generated file name.
    :param fd: If True, this returns a file-like object with the temporary
               file opened. If False (default), this returns a file path.
    """
    f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
                                    delete=False)
    if not keep:
        conf.temp_files.append(f.name)

    if fd:
        return f
    else:
        # Close the file so something else can take it.
        f.close()
        return f.name


def get_temp_dir(keep=False):
    # type: (bool) -> str
    """Creates a temporary file, and returns its name.

    :param keep: If False (default), the directory will be recursively
                 deleted when Scapy exits.
    :return: A full path to a temporary directory.
    """

    dname = tempfile.mkdtemp(prefix="scapy")

    if not keep:
        conf.temp_files.append(dname)

    return dname


def sane(x, color=False):
    # type: (AnyStr, bool) -> str
    r = ""
    for i in x:
        j = orb(i)
        if (j < 32) or (j >= 127):
            if color:
                r += conf.color_theme.not_printable(".")
            else:
                r += "."
        else:
            r += chr(j)
    return r


@conf.commands.register
def restart():
    # type: () -> None
    """Restarts scapy"""
    if not conf.interactive or not os.path.isfile(sys.argv[0]):
        raise OSError("Scapy was not started from console")
    if WINDOWS:
        try:
            res_code = subprocess.call([sys.executable] + sys.argv)
        except KeyboardInterrupt:
            res_code = 1
        finally:
            os._exit(res_code)
    os.execv(sys.executable, [sys.executable] + sys.argv)


def lhex(x):
    # type: (Any) -> str
    from scapy.volatile import VolatileValue
    if isinstance(x, VolatileValue):
        return repr(x)
    if type(x) in six.integer_types:
        return hex(x)
    elif isinstance(x, tuple):
        return "(%s)" % ", ".join(map(lhex, x))
    elif isinstance(x, list):
        return "[%s]" % ", ".join(map(lhex, x))
    else:
        return str(x)


@conf.commands.register
def hexdump(p, dump=False):
    # type: (Union[Packet, AnyStr], bool) -> Optional[str]
    """Build a tcpdump like hexadecimal view

    :param p: a Packet
    :param dump: define if the result must be printed or returned in a variable
    :return: a String only when dump=True
    """
    s = ""
    x = bytes_encode(p)
    x_len = len(x)
    i = 0
    while i < x_len:
        s += "%04x  " % i
        for j in range(16):
            if i + j < x_len:
                s += "%02X " % orb(x[i + j])
            else:
                s += "   "
        s += " %s\n" % sane(x[i:i + 16], color=True)
        i += 16
    # remove trailing \n
    s = s[:-1] if s.endswith("\n") else s
    if dump:
        return s
    else:
        print(s)
        return None


@conf.commands.register
def linehexdump(p, onlyasc=0, onlyhex=0, dump=False):
    # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str]
    """Build an equivalent view of hexdump() on a single line

    Note that setting both onlyasc and onlyhex to 1 results in a empty output

    :param p: a Packet
    :param onlyasc: 1 to display only the ascii view
    :param onlyhex: 1 to display only the hexadecimal view
    :param dump: print the view if False
    :return: a String only when dump=True
    """
    s = ""
    s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump)
    if dump:
        return s
    else:
        print(s)
        return None


@conf.commands.register
def chexdump(p, dump=False):
    # type: (Union[Packet, AnyStr], bool) -> Optional[str]
    """Build a per byte hexadecimal representation

    Example:
        >>> chexdump(IP())
        0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01  # noqa: E501

    :param p: a Packet
    :param dump: print the view if False
    :return: a String only if dump=True
    """
    x = bytes_encode(p)
    s = ", ".join("%#04x" % orb(x) for x in x)
    if dump:
        return s
    else:
        print(s)
        return None


@conf.commands.register
def hexstr(p, onlyasc=0, onlyhex=0, color=False):
    # type: (Union[Packet, AnyStr], int, int, bool) -> str
    """Build a fancy tcpdump like hex from bytes."""
    x = bytes_encode(p)
    s = []
    if not onlyasc:
        s.append(" ".join("%02X" % orb(b) for b in x))
    if not onlyhex:
        s.append(sane(x, color=color))
    return "  ".join(s)


def repr_hex(s):
    # type: (bytes) -> str
    """ Convert provided bitstring to a simple string of hex digits """
    return "".join("%02x" % orb(x) for x in s)


@conf.commands.register
def hexdiff(a, b, autojunk=False):
    # type: (Union[Packet, AnyStr], Union[Packet, AnyStr], bool) -> None
    """
    Show differences between 2 binary strings, Packets...

    For the autojunk parameter, see
    https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher

    :param a:
    :param b: The binary strings, packets... to compare
    :param autojunk: Setting it to True will likely increase the comparison
        speed a lot on big byte strings, but will reduce accuracy (will tend
        to miss insertion and see replacements instead for instance).
    """

    # Compare the strings using difflib

    xb = bytes_encode(a)
    yb = bytes_encode(b)

    sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk)
    xarr = [xb[i:i + 1] for i in range(len(xb))]
    yarr = [yb[i:i + 1] for i in range(len(yb))]

    backtrackx = []
    backtracky = []
    for opcode in sm.get_opcodes():
        typ, x0, x1, y0, y1 = opcode
        if typ == 'delete':
            backtrackx += xarr[x0:x1]
            backtracky += [b''] * (x1 - x0)
        elif typ == 'insert':
            backtrackx += [b''] * (y1 - y0)
            backtracky += yarr[y0:y1]
        elif typ in ['equal', 'replace']:
            backtrackx += xarr[x0:x1]
            backtracky += yarr[y0:y1]

    if autojunk:
        # Some lines may have been considered as junk. Check the sizes
        lbx = len(backtrackx)
        lby = len(backtracky)
        backtrackx += [b''] * (max(lbx, lby) - lbx)
        backtracky += [b''] * (max(lbx, lby) - lby)

    # Print the diff

    x = y = i = 0
    colorize = {0: lambda x: x,
                -1: conf.color_theme.left,
                1: conf.color_theme.right}

    dox = 1
    doy = 0
    btx_len = len(backtrackx)
    while i < btx_len:
        linex = backtrackx[i:i + 16]
        liney = backtracky[i:i + 16]
        xx = sum(len(k) for k in linex)
        yy = sum(len(k) for k in liney)
        if dox and not xx:
            dox = 0
            doy = 1
        if dox and linex == liney:
            doy = 1

        if dox:
            xd = y
            j = 0
            while not linex[j]:
                j += 1
                xd -= 1
            print(colorize[doy - dox]("%04x" % xd), end=' ')
            x += xx
            line = linex
        else:
            print("    ", end=' ')
        if doy:
            yd = y
            j = 0
            while not liney[j]:
                j += 1
                yd -= 1
            print(colorize[doy - dox]("%04x" % yd), end=' ')
            y += yy
            line = liney
        else:
            print("    ", end=' ')

        print(" ", end=' ')

        cl = ""
        for j in range(16):
            if i + j < btx_len:
                if line[j]:
                    col = colorize[(linex[j] != liney[j]) * (doy - dox)]
                    print(col("%02X" % orb(line[j])), end=' ')
                    if linex[j] == liney[j]:
                        cl += sane(line[j], color=True)
                    else:
                        cl += col(sane(line[j]))
                else:
                    print("  ", end=' ')
                    cl += " "
            else:
                print("  ", end=' ')
            if j == 7:
                print("", end=' ')

        print(" ", cl)

        if doy or not yy:
            doy = 0
            dox = 1
            i += 16
        else:
            if yy:
                dox = 0
                doy = 1
            else:
                i += 16


if struct.pack("H", 1) == b"\x00\x01":  # big endian
    checksum_endian_transform = lambda chk: chk  # type: Callable[[int], int]
else:
    checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8


def checksum(pkt):
    # type: (bytes) -> int
    if len(pkt) % 2 == 1:
        pkt += b"\0"
    s = sum(array.array("H", pkt))
    s = (s >> 16) + (s & 0xffff)
    s += s >> 16
    s = ~s
    return checksum_endian_transform(s) & 0xffff


def _fletcher16(charbuf):
    # type: (bytes) -> Tuple[int, int]
    # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/>  # noqa: E501
    c0 = c1 = 0
    for char in charbuf:
        c0 += orb(char)
        c1 += c0

    c0 %= 255
    c1 %= 255
    return (c0, c1)


@conf.commands.register
def fletcher16_checksum(binbuf):
    # type: (bytes) -> int
    """Calculates Fletcher-16 checksum of the given buffer.

       Note:
       If the buffer contains the two checkbytes derived from the Fletcher-16 checksum  # noqa: E501
       the result of this function has to be 0. Otherwise the buffer has been corrupted.  # noqa: E501
    """
    (c0, c1) = _fletcher16(binbuf)
    return (c1 << 8) | c0


@conf.commands.register
def fletcher16_checkbytes(binbuf, offset):
    # type: (bytes, int) -> bytes
    """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.

       Including the bytes into the buffer (at the position marked by offset) the  # noqa: E501
       global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify  # noqa: E501
       the integrity of the buffer on the receiver side.

       For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B.  # noqa: E501
    """

    # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/>  # noqa: E501
    if len(binbuf) < offset:
        raise Exception("Packet too short for checkbytes %d" % len(binbuf))

    binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
    (c0, c1) = _fletcher16(binbuf)

    x = ((len(binbuf) - offset - 1) * c0 - c1) % 255

    if (x <= 0):
        x += 255

    y = 510 - c0 - x

    if (y > 255):
        y -= 255
    return chb(x) + chb(y)


def mac2str(mac):
    # type: (str) -> bytes
    return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':'))


def valid_mac(mac):
    # type: (str) -> bool
    try:
        return len(mac2str(mac)) == 6
    except ValueError:
        pass
    return False


def str2mac(s):
    # type: (bytes) -> str
    if isinstance(s, str):
        return ("%02x:" * 6)[:-1] % tuple(map(ord, s))
    return ("%02x:" * 6)[:-1] % tuple(s)


def randstring(length):
    # type: (int) -> bytes
    """
    Returns a random string of length (length >= 0)
    """
    return b"".join(struct.pack('B', random.randint(0, 255))
                    for _ in range(length))


def zerofree_randstring(length):
    # type: (int) -> bytes
    """
    Returns a random string of length (length >= 0) without zero in it.
    """
    return b"".join(struct.pack('B', random.randint(1, 255))
                    for _ in range(length))


def strxor(s1, s2):
    # type: (bytes, bytes) -> bytes
    """
    Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
    must be of same length.
    """
    return b"".join(map(lambda x, y: chb(orb(x) ^ orb(y)), s1, s2))


def strand(s1, s2):
    # type: (bytes, bytes) -> bytes
    """
    Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
    must be of same length.
    """
    return b"".join(map(lambda x, y: chb(orb(x) & orb(y)), s1, s2))


# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470  # noqa: E501
try:
    socket.inet_aton("255.255.255.255")
except socket.error:
    def inet_aton(ip_string):
        # type: (str) -> bytes
        if ip_string == "255.255.255.255":
            return b"\xff" * 4
        else:
            return socket.inet_aton(ip_string)
else:
    inet_aton = socket.inet_aton

inet_ntoa = socket.inet_ntoa


def atol(x):
    # type: (str) -> int
    try:
        ip = inet_aton(x)
    except socket.error:
        ip = inet_aton(socket.gethostbyname(x))
    return cast(int, struct.unpack("!I", ip)[0])


def valid_ip(addr):
    # type: (str) -> bool
    try:
        addr = plain_str(addr)
    except UnicodeDecodeError:
        return False
    try:
        atol(addr)
    except (OSError, ValueError, socket.error):
        return False
    return True


def valid_net(addr):
    # type: (str) -> bool
    try:
        addr = plain_str(addr)
    except UnicodeDecodeError:
        return False
    if '/' in addr:
        ip, mask = addr.split('/', 1)
        return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32
    return valid_ip(addr)


def valid_ip6(addr):
    # type: (str) -> bool
    try:
        addr = plain_str(addr)
    except UnicodeDecodeError:
        return False
    try:
        inet_pton(socket.AF_INET6, addr)
    except socket.error:
        try:
            socket.getaddrinfo(addr, None, socket.AF_INET6)[0][4][0]
        except socket.error:
            return False
    return True


def valid_net6(addr):
    # type: (str) -> bool
    try:
        addr = plain_str(addr)
    except UnicodeDecodeError:
        return False
    if '/' in addr:
        ip, mask = addr.split('/', 1)
        return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128
    return valid_ip6(addr)


def ltoa(x):
    # type: (int) -> str
    return inet_ntoa(struct.pack("!I", x & 0xffffffff))


def itom(x):
    # type: (int) -> int
    return (0xffffffff00000000 >> x) & 0xffffffff


class ContextManagerSubprocess(object):
    """
    Context manager that eases checking for unknown command, without
    crashing.

    Example:
    >>> with ContextManagerSubprocess("tcpdump"):
    >>>     subprocess.Popen(["tcpdump", "--version"])
    ERROR: Could not execute tcpdump, is it installed?

    """

    def __init__(self, prog, suppress=True):
        # type: (str, bool) -> None
        self.prog = prog
        self.suppress = suppress

    def __enter__(self):
        # type: () -> None
        pass

    def __exit__(self,
                 exc_type,  # type: Optional[type]
                 exc_value,  # type: Optional[Exception]
                 traceback,  # type: Optional[Any]
                 ):
        # type: (...) -> Optional[bool]
        if exc_value is None or exc_type is None:
            return None
        # Errored
        if isinstance(exc_value, EnvironmentError):
            msg = "Could not execute %s, is it installed?" % self.prog
        else:
            msg = "%s: execution failed (%s)" % (
                self.prog,
                exc_type.__class__.__name__
            )
        if not self.suppress:
            raise exc_type(msg)
        log_runtime.error(msg, exc_info=True)
        return True  # Suppress the exception


class ContextManagerCaptureOutput(object):
    """
    Context manager that intercept the console's output.

    Example:
    >>> with ContextManagerCaptureOutput() as cmco:
    ...     print("hey")
    ...     assert cmco.get_output() == "hey"
    """

    def __init__(self):
        # type: () -> None
        self.result_export_object = ""
        try:
            import mock  # noqa: F401
        except Exception:
            raise ImportError("The mock module needs to be installed !")

    def __enter__(self):
        # type: () -> ContextManagerCaptureOutput
        import mock

        def write(s, decorator=self):
            # type: (str, ContextManagerCaptureOutput) -> None
            decorator.result_export_object += s
        mock_stdout = mock.Mock()
        mock_stdout.write = write
        self.bck_stdout = sys.stdout
        sys.stdout = mock_stdout
        return self

    def __exit__(self, *exc):
        # type: (*Any) -> Literal[False]
        sys.stdout = self.bck_stdout
        return False

    def get_output(self, eval_bytes=False):
        # type: (bool) -> str
        if self.result_export_object.startswith("b'") and eval_bytes:
            return plain_str(eval(self.result_export_object))
        return self.result_export_object


def do_graph(
    graph,  # type: str
    prog=None,  # type: Optional[str]
    format=None,  # type: Optional[str]
    target=None,  # type: Optional[Union[IO[bytes], str]]
    type=None,  # type: Optional[str]
    string=None,  # type: Optional[bool]
    options=None  # type: Optional[List[str]]
):
    # type: (...) -> Optional[str]
    """Processes graph description using an external software.
    This method is used to convert a graphviz format to an image.

    :param graph: GraphViz graph description
    :param prog: which graphviz program to use
    :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T"
        option
    :param string: if not None, simply return the graph string
    :param target: filename or redirect. Defaults pipe to Imagemagick's
        display program
    :param options: options to be passed to prog
    """

    if format is None:
        format = "svg"
    if string:
        return graph
    if type is not None:
        warnings.warn(
            "type is deprecated, and was renamed format",
            DeprecationWarning
        )
        format = type
    if prog is None:
        prog = conf.prog.dot
    start_viewer = False
    if target is None:
        if WINDOWS:
            target = get_temp_file(autoext="." + format)
            start_viewer = True
        else:
            with ContextManagerSubprocess(conf.prog.display):
                target = subprocess.Popen([conf.prog.display],
                                          stdin=subprocess.PIPE).stdin
    if format is not None:
        format = "-T%s" % format
    if isinstance(target, str):
        if target.startswith('|'):
            target = subprocess.Popen(target[1:].lstrip(), shell=True,
                                      stdin=subprocess.PIPE).stdin
        elif target.startswith('>'):
            target = open(target[1:].lstrip(), "wb")
        else:
            target = open(os.path.abspath(target), "wb")
    target = cast(IO[bytes], target)
    proc = subprocess.Popen(
        "\"%s\" %s %s" % (prog, options or "", format or ""),
        shell=True, stdin=subprocess.PIPE, stdout=target,
        stderr=subprocess.PIPE
    )
    _, stderr = proc.communicate(bytes_encode(graph))
    if proc.returncode != 0:
        raise OSError(
            "GraphViz call failed (is it installed?):\n" +
            plain_str(stderr)
        )
    try:
        target.close()
    except Exception:
        pass
    if start_viewer:
        # Workaround for file not found error: We wait until tempfile is written.  # noqa: E501
        waiting_start = time.time()
        while not os.path.exists(target.name):
            time.sleep(0.1)
            if time.time() - waiting_start > 3:
                warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile)  # noqa: E501
                break
        else:
            if conf.prog.display == conf.prog._default:
                os.startfile(target.name)  # type: ignore
            else:
                with ContextManagerSubprocess(conf.prog.display):
                    subprocess.Popen([conf.prog.display, target.name])
    return None


_TEX_TR = {
    "{": "{\\tt\\char123}",
    "}": "{\\tt\\char125}",
    "\\": "{\\tt\\char92}",
    "^": "\\^{}",
    "$": "\\$",
    "#": "\\#",
    "_": "\\_",
    "&": "\\&",
    "%": "\\%",
    "|": "{\\tt\\char124}",
    "~": "{\\tt\\char126}",
    "<": "{\\tt\\char60}",
    ">": "{\\tt\\char62}",
}


def tex_escape(x):
    # type: (str) -> str
    s = ""
    for c in x:
        s += _TEX_TR.get(c, c)
    return s


def colgen(*lstcol,  # type: Any
           **kargs  # type: Any
           ):
    # type: (...) -> Iterator[Any]
    """Returns a generator that mixes provided quantities forever
    trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default"""  # noqa: E501
    if len(lstcol) < 2:
        lstcol *= 2
    trans = kargs.get("trans", lambda x, y, z: (x, y, z))
    while True:
        for i in range(len(lstcol)):
            for j in range(len(lstcol)):
                for k in range(len(lstcol)):
                    if i != j or j != k or k != i:
                        yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)])  # noqa: E501


def incremental_label(label="tag%05i", start=0):
    # type: (str, int) -> Iterator[str]
    while True:
        yield label % start
        start += 1


def binrepr(val):
    # type: (int) -> str
    return bin(val)[2:]


def long_converter(s):
    # type: (str) -> int
    return int(s.replace('\n', '').replace(' ', ''), 16)

#########################
#    Enum management    #
#########################


class EnumElement:
    def __init__(self, key, value):
        # type: (str, int) -> None
        self._key = key
        self._value = value

    def __repr__(self):
        # type: () -> str
        return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value)  # noqa: E501

    def __getattr__(self, attr):
        # type: (str) -> Any
        return getattr(self._value, attr)

    def __str__(self):
        # type: () -> str
        return self._key

    def __bytes__(self):
        # type: () -> bytes
        return bytes_encode(self.__str__())

    def __hash__(self):
        # type: () -> int
        return self._value

    def __int__(self):
        # type: () -> int
        return int(self._value)

    def __eq__(self, other):
        # type: (Any) -> bool
        return self._value == int(other)

    def __neq__(self, other):
        # type: (Any) -> bool
        return not self.__eq__(other)


class Enum_metaclass(type):
    element_class = EnumElement

    def __new__(cls, name, bases, dct):
        # type: (Any, str, Any, Dict[str, Any]) -> Any
        rdict = {}
        for k, v in six.iteritems(dct):
            if isinstance(v, int):
                v = cls.element_class(k, v)
                dct[k] = v
                rdict[v] = k
        dct["__rdict__"] = rdict
        return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)

    def __getitem__(self, attr):
        # type: (int) -> Any
        return self.__rdict__[attr]  # type: ignore

    def __contains__(self, val):
        # type: (int) -> bool
        return val in self.__rdict__  # type: ignore

    def get(self, attr, val=None):
        # type: (str, Optional[Any]) -> Any
        return self.__rdict__.get(attr, val)  # type: ignore

    def __repr__(self):
        # type: () -> str
        return "<%s>" % self.__dict__.get("name", self.__name__)


###################
#  Object saving  #
###################


def export_object(obj):
    # type: (Any) -> None
    import zlib
    print(bytes_base64(zlib.compress(six.moves.cPickle.dumps(obj, 2), 9)))


def import_object(obj=None):
    # type: (Optional[str]) -> Any
    import zlib
    if obj is None:
        obj = sys.stdin.read()
    return six.moves.cPickle.loads(zlib.decompress(base64_bytes(obj.strip())))  # noqa: E501


def save_object(fname, obj):
    # type: (str, Any) -> None
    """Pickle a Python object"""

    fd = gzip.open(fname, "wb")
    six.moves.cPickle.dump(obj, fd)
    fd.close()


def load_object(fname):
    # type: (str) -> Any
    """unpickle a Python object"""
    return six.moves.cPickle.load(gzip.open(fname, "rb"))


@conf.commands.register
def corrupt_bytes(data, p=0.01, n=None):
    # type: (str, float, Optional[int]) -> bytes
    """
    Corrupt a given percentage (at least one byte) or number of bytes
    from a string
    """
    s = array.array("B", bytes_encode(data))
    s_len = len(s)
    if n is None:
        n = max(1, int(s_len * p))
    for i in random.sample(range(s_len), n):
        s[i] = (s[i] + random.randint(1, 255)) % 256
    return s.tostring() if six.PY2 else s.tobytes()


@conf.commands.register
def corrupt_bits(data, p=0.01, n=None):
    # type: (str, float, Optional[int]) -> bytes
    """
    Flip a given percentage (at least one bit) or number of bits
    from a string
    """
    s = array.array("B", bytes_encode(data))
    s_len = len(s) * 8
    if n is None:
        n = max(1, int(s_len * p))
    for i in random.sample(range(s_len), n):
        s[i // 8] ^= 1 << (i % 8)
    return s.tostring() if six.PY2 else s.tobytes()


#############################
#  pcap capture file stuff  #
#############################

@conf.commands.register
def wrpcap(filename,  # type: Union[IO[bytes], str]
           pkt,  # type: _PacketIterable
           *args,  # type: Any
           **kargs  # type: Any
           ):
    # type: (...) -> None
    """Write a list of packets to a pcap file

    :param filename: the name of the file to write packets to, or an open,
        writable file-like object. The file descriptor will be
        closed at the end of the call, so do not use an object you
        do not want to close (e.g., running wrpcap(sys.stdout, [])
        in interactive mode will crash Scapy).
    :param gz: set to 1 to save a gzipped capture
    :param linktype: force linktype value
    :param endianness: "<" or ">", force endianness
    :param sync: do not bufferize writes to the capture file
    """
    with PcapWriter(filename, *args, **kargs) as fdesc:
        fdesc.write(pkt)


@conf.commands.register
def rdpcap(filename, count=-1):
    # type: (Union[IO[bytes], str], int) -> PacketList
    """Read a pcap or pcapng file and return a packet list

    :param count: read only <count> packets
    """
    # Rant: Our complicated use of metaclasses and especially the
    # __call__ function is, of course, not supported by MyPy.
    # One day we should simplify this mess and use a much simpler
    # layout that will actually be supported and properly dissected.
    with PcapReader(filename) as fdesc:  # type: ignore
        return fdesc.read_all(count=count)


# NOTE: Type hinting
# Mypy doesn't understand the following metaclass, and thinks each
# constructor (PcapReader...) needs 3 arguments each. To avoid this,
# we add a fake (=None) to the last 2 arguments then force the value
# to not be None in the signature and pack the whole thing in an ignore.
# This allows to not have # type: ignore every time we call those
# constructors.

class PcapReader_metaclass(type):
    """Metaclass for (Raw)Pcap(Ng)Readers"""

    def __new__(cls, name, bases, dct):
        # type: (Any, str, Any, Dict[str, Any]) -> Any
        """The `alternative` class attribute is declared in the PcapNg
        variant, and set here to the Pcap variant.

        """
        newcls = super(PcapReader_metaclass, cls).__new__(
            cls, name, bases, dct
        )
        if 'alternative' in dct:
            dct['alternative'].alternative = newcls
        return newcls

    def __call__(cls, filename):  # type: ignore
        # type: (Union[IO[bytes], str]) -> Any
        """Creates a cls instance, use the `alternative` if that
        fails.

        """
        i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__)
        filename, fdesc, magic = cls.open(filename)
        if not magic:
            raise Scapy_Exception(
                "No data could be read!"
            )
        try:
            i.__init__(filename, fdesc, magic)
            return i
        except (Scapy_Exception, EOFError):
            pass

        if "alternative" in cls.__dict__:
            cls = cls.__dict__["alternative"]
            i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__)
            try:
                i.__init__(filename, fdesc, magic)
                return i
            except (Scapy_Exception, EOFError):
                pass

        raise Scapy_Exception("Not a supported capture file")

    @staticmethod
    def open(fname  # type: Union[IO[bytes], str]
             ):
        # type: (...) -> Tuple[str, _ByteStream, bytes]
        """Open (if necessary) filename, and read the magic."""
        if isinstance(fname, str):
            filename = fname
            try:
                fdesc = gzip.open(filename, "rb")  # type: _ByteStream
                magic = fdesc.read(4)
            except IOError:
                fdesc = open(filename, "rb")
                magic = fdesc.read(4)
        else:
            fdesc = fname
            filename = getattr(fdesc, "name", "No name")
            magic = fdesc.read(4)
        return filename, fdesc, magic


@six.add_metaclass(PcapReader_metaclass)
class RawPcapReader:
    """A stateful pcap reader. Each packet is returned as a string"""

    nonblocking_socket = True
    PacketMetadata = collections.namedtuple("PacketMetadata",
                                            ["sec", "usec", "wirelen", "caplen"])  # noqa: E501

    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
        # type: (str, _ByteStream, bytes) -> None
        self.filename = filename
        self.f = fdesc
        if magic == b"\xa1\xb2\xc3\xd4":  # big endian
            self.endian = ">"
            self.nano = False
        elif magic == b"\xd4\xc3\xb2\xa1":  # little endian
            self.endian = "<"
            self.nano = False
        elif magic == b"\xa1\xb2\x3c\x4d":  # big endian, nanosecond-precision
            self.endian = ">"
            self.nano = True
        elif magic == b"\x4d\x3c\xb2\xa1":  # little endian, nanosecond-precision  # noqa: E501
            self.endian = "<"
            self.nano = True
        else:
            raise Scapy_Exception(
                "Not a pcap capture file (bad magic: %r)" % magic
            )
        hdr = self.f.read(20)
        if len(hdr) < 20:
            raise Scapy_Exception("Invalid pcap file (too short)")
        vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
            self.endian + "HHIIII", hdr
        )
        self.linktype = linktype
        self.snaplen = snaplen

    def __iter__(self):
        # type: () -> RawPcapReader
        return self

    def next(self):
        # type: () -> Packet
        """
        implement the iterator protocol on a set of packets in a pcap file
        """
        try:
            return self.read_packet()
        except EOFError:
            raise StopIteration
    __next__ = next

    def _read_packet(self, size=MTU):
        # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata]
        """return a single packet read from the file as a tuple containing
        (pkt_data, pkt_metadata)

        raise EOFError when no more packets are available
        """
        hdr = self.f.read(16)
        if len(hdr) < 16:
            raise EOFError
        sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr)
        return (self.f.read(caplen)[:size],
                RawPcapReader.PacketMetadata(sec=sec, usec=usec,
                                             wirelen=wirelen, caplen=caplen))

    def read_packet(self, size=MTU):
        # type: (int) -> Packet
        return cast(
            Packet,
            self._read_packet()[0]
        )

    def dispatch(self,
                 callback  # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any]  # noqa: E501
                 ):
        # type: (...) -> None
        """call the specified callback routine for each packet read

        This is just a convenience function for the main loop
        that allows for easy launching of packet processing in a
        thread.
        """
        for p in self:
            callback(p)

    def read_all(self, count=-1):
        # type: (int) -> PacketList
        res = self._read_all(count)
        from scapy import plist
        return plist.PacketList(res, name=os.path.basename(self.filename))

    def _read_all(self, count=-1):
        # type: (int) -> List[Packet]
        """return a list of all packets in the pcap file
        """
        res = []  # type: List[Packet]
        while count != 0:
            count -= 1
            try:
                p = self.read_packet()  # type: Packet
            except EOFError:
                break
            res.append(p)
        return res

    def recv(self, size=MTU):
        # type: (int) -> bytes
        """ Emulate a socket
        """
        return self._read_packet(size=size)[0]

    def fileno(self):
        # type: () -> int
        return self.f.fileno()

    def close(self):
        # type: () -> Optional[Any]
        return self.f.close()

    def __exit__(self, exc_type, exc_value, tracback):
        # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
        self.close()

    # emulate SuperSocket
    @staticmethod
    def select(sockets,  # type: List[SuperSocket]
               remain=None,  # type: Optional[float]
               ):
        # type: (...) -> List[SuperSocket]
        return sockets


class PcapReader(RawPcapReader, _SuperSocket):
    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
        # type: (str, IO[bytes], bytes) -> None
        RawPcapReader.__init__(self, filename, fdesc, magic)
        try:
            self.LLcls = conf.l2types.num2layer[
                self.linktype
            ]  # type: Type[Packet]
        except KeyError:
            warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype))  # noqa: E501
            if conf.raw_layer is None:
                # conf.raw_layer is set on import
                import scapy.packet  # noqa: F401
            self.LLcls = conf.raw_layer

    def __enter__(self):
        # type: () -> PcapReader
        return self

    def read_packet(self, size=MTU):
        # type: (int) -> Packet
        rp = super(PcapReader, self)._read_packet(size=size)
        if rp is None:
            raise EOFError
        s, pkt_info = rp

        try:
            p = self.LLcls(s)  # type: Packet
        except KeyboardInterrupt:
            raise
        except Exception:
            if conf.debug_dissector:
                from scapy.sendrecv import debug
                debug.crashed_on = (self.LLcls, s)
                raise
            if conf.raw_layer is None:
                # conf.raw_layer is set on import
                import scapy.packet  # noqa: F401
            p = conf.raw_layer(s)
        power = Decimal(10) ** Decimal(-9 if self.nano else -6)
        p.time = EDecimal(pkt_info.sec + power * pkt_info.usec)
        p.wirelen = pkt_info.wirelen
        return p

    def recv(self, size=MTU):
        # type: (int) -> Packet
        return self.read_packet(size=size)


class RawPcapNgReader(RawPcapReader):
    """A stateful pcapng reader. Each packet is returned as
    bytes.

    """

    alternative = RawPcapReader  # type: Type[Any]

    PacketMetadata = collections.namedtuple("PacketMetadata",
                                            ["linktype", "tsresol",
                                             "tshigh", "tslow", "wirelen"])

    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
        # type: (str, IO[bytes], bytes) -> None
        self.filename = filename
        self.f = fdesc
        # A list of (linktype, snaplen, tsresol); will be populated by IDBs.
        self.interfaces = []  # type: List[Tuple[int, int, int]]
        self.default_options = {
            "tsresol": 1000000
        }
        self.blocktypes = {
            1: self._read_block_idb,
            2: self._read_block_pkt,
            3: self._read_block_spb,
            6: self._read_block_epb,
        }
        self.endian = "!"  # Will be overwritten by first SHB

        if magic != b"\x0a\x0d\x0d\x0a":  # PcapNg:
            raise Scapy_Exception(
                "Not a pcapng capture file (bad magic: %r)" % magic
            )

        try:
            self._read_block_shb()
        except EOFError:
            raise Scapy_Exception(
                "The first SHB of the pcapng file is malformed !"
            )

    def _read_block(self, size=MTU):
        # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]  # noqa: E501
        try:
            blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0]
        except struct.error:
            raise EOFError
        if blocktype == 0x0A0D0D0A:
            # This function updates the endianness based on the block content.
            self._read_block_shb()
            return None
        try:
            blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0]
        except struct.error:
            raise EOFError
        if blocklen < 12:
            warning("Invalid block length !")
            raise EOFError
        block = self.f.read(blocklen - 12)
        self._read_block_tail(blocklen)
        return self.blocktypes.get(
            blocktype,
            lambda block, size: None
        )(block, size)

    def _read_block_tail(self, blocklen):
        # type: (int) -> None
        if blocklen % 4:
            pad = self.f.read(-blocklen % 4)
            warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
                    "Ignored padding %r" % (blocklen, pad))
        try:
            if blocklen != struct.unpack(self.endian + 'I',
                                         self.f.read(4))[0]:
                raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)")
        except struct.error:
            raise EOFError

    def _read_block_shb(self):
        # type: () -> None
        _blocklen = self.f.read(4)
        endian = self.f.read(4)
        if endian == b"\x1a\x2b\x3c\x4d":
            self.endian = ">"
        elif endian == b"\x4d\x3c\x2b\x1a":
            self.endian = "<"
        else:
            warning("Bad magic in Section Header block (not a pcapng file?)")
            raise EOFError

        blocklen = struct.unpack(self.endian + "I", _blocklen)[0]
        if blocklen < 16:
            warning("Invalid SHB block length!")
            raise EOFError
        options = self.f.read(blocklen - 16)
        self._read_block_tail(blocklen)
        self._read_options(options)

    def _read_packet(self, size=MTU):  # type: ignore
        # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
        """Read blocks until it reaches either EOF or a packet, and
        returns None or (packet, (linktype, sec, usec, wirelen)),
        where packet is a string.

        """
        while True:
            res = self._read_block()
            if res is not None:
                return res

    def _read_options(self, options):
        # type: (bytes) -> Dict[str, int]
        """Section Header Block"""
        opts = self.default_options.copy()
        while len(options) >= 4:
            code, length = struct.unpack(self.endian + "HH", options[:4])
            # PCAP Next Generation (pcapng) Capture File Format
            # 4.2. - Interface Description Block
            # http://xml2rfc.tools.ietf.org/cgi-bin/xml2rfc.cgi?url=https://raw.githubusercontent.com/pcapng/pcapng/master/draft-tuexen-opsawg-pcapng.xml&modeAsFormat=html/ascii&type=ascii#rfc.section.4.2
            if code == 9 and length == 1 and len(options) >= 5:
                tsresol = orb(options[4])
                opts["tsresol"] = (2 if tsresol & 128 else 10) ** (
                    tsresol & 127
                )
            if code == 0:
                if length != 0:
                    warning("PcapNg: invalid option length %d for end-of-option" % length)  # noqa: E501
                break
            if length % 4:
                length += (4 - (length % 4))
            options = options[4 + length:]
        return opts

    def _read_block_idb(self, block, _):
        # type: (bytes, int) -> None
        """Interface Description Block"""
        # 2 bytes LinkType + 2 bytes Reserved
        # 4 bytes Snaplen
        options = self._read_options(block[8:-4])
        try:
            interface = struct.unpack(  # type: ignore
                self.endian + "HxxI",
                block[:8]
            ) + (options["tsresol"],)  # type: Tuple[int, int, int]
        except struct.error:
            warning("PcapNg: IDB is too small %d/8 !" % len(block))
            raise EOFError
        self.interfaces.append(interface)

    def _check_interface_id(self, intid):
        # type: (int) -> None
        """Check the interface id value and raise EOFError if invalid."""
        tmp_len = len(self.interfaces)
        if intid >= tmp_len:
            warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len))
            raise EOFError

    def _read_block_epb(self, block, size):
        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
        """Enhanced Packet Block"""
        try:
            intid, tshigh, tslow, caplen, wirelen = struct.unpack(
                self.endian + "5I",
                block[:20],
            )
        except struct.error:
            warning("PcapNg: EPB is too small %d/20 !" % len(block))
            raise EOFError

        self._check_interface_id(intid)
        return (block[20:20 + caplen][:size],
                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
                                               tsresol=self.interfaces[intid][2],  # noqa: E501
                                               tshigh=tshigh,
                                               tslow=tslow,
                                               wirelen=wirelen))

    def _read_block_spb(self, block, size):
        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
        """Simple Packet Block"""
        # "it MUST be assumed that all the Simple Packet Blocks have
        # been captured on the interface previously specified in the
        # first Interface Description Block."
        intid = 0
        self._check_interface_id(intid)

        try:
            wirelen, = struct.unpack(self.endian + "I", block[:4])
        except struct.error:
            warning("PcapNg: SPB is too small %d/4 !" % len(block))
            raise EOFError

        caplen = min(wirelen, self.interfaces[intid][1])
        return (block[4:4 + caplen][:size],
                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
                                               tsresol=self.interfaces[intid][2],  # noqa: E501
                                               tshigh=None,
                                               tslow=None,
                                               wirelen=wirelen))

    def _read_block_pkt(self, block, size):
        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
        """(Obsolete) Packet Block"""
        try:
            intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
                self.endian + "HH4I",
                block[:20],
            )
        except struct.error:
            warning("PcapNg: PKT is too small %d/20 !" % len(block))
            raise EOFError

        self._check_interface_id(intid)
        return (block[20:20 + caplen][:size],
                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
                                               tsresol=self.interfaces[intid][2],  # noqa: E501
                                               tshigh=tshigh,
                                               tslow=tslow,
                                               wirelen=wirelen))


class PcapNgReader(RawPcapNgReader, _SuperSocket):

    alternative = PcapReader

    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
        # type: (str, IO[bytes], bytes) -> None
        RawPcapNgReader.__init__(self, filename, fdesc, magic)

    def __enter__(self):
        # type: () -> PcapNgReader
        return self

    def read_packet(self, size=MTU):
        # type: (int) -> Packet
        rp = super(PcapNgReader, self)._read_packet(size=size)
        if rp is None:
            raise EOFError
        s, (linktype, tsresol, tshigh, tslow, wirelen) = rp
        try:
            cls = conf.l2types.num2layer[linktype]  # type: Type[Packet]
            p = cls(s)  # type: Packet
        except KeyboardInterrupt:
            raise
        except Exception:
            if conf.debug_dissector:
                raise
            if conf.raw_layer is None:
                # conf.raw_layer is set on import
                import scapy.packet  # noqa: F401
            p = conf.raw_layer(s)
        if tshigh is not None:
            p.time = EDecimal((tshigh << 32) + tslow) / tsresol
        p.wirelen = wirelen
        return p

    def recv(self, size=MTU):
        # type: (int) -> Packet
        return self.read_packet()


class RawPcapWriter:
    """A stream PCAP writer with more control than wrpcap()"""

    def __init__(self,
                 filename,  # type: Union[IO[bytes], str]
                 linktype=None,  # type: Optional[int]
                 gz=False,  # type: bool
                 endianness="",  # type: str
                 append=False,  # type: bool
                 sync=False,  # type: bool
                 nano=False,  # type: bool
                 snaplen=MTU,  # type: int
                 ):
        # type: (...) -> None
        """
        :param filename: the name of the file to write packets to, or an open,
            writable file-like object.
        :param linktype: force linktype to a given value. If None, linktype is
            taken from the first writer packet
        :param gz: compress the capture on the fly
        :param endianness: force an endianness (little:"<", big:">").
            Default is native
        :param append: append packets to the capture file instead of
            truncating it
        :param sync: do not bufferize writes to the capture file
        :param nano: use nanosecond-precision (requires libpcap >= 1.5.0)

        """

        self.linktype = linktype
        self.snaplen = snaplen
        self.header_present = 0
        self.append = append
        self.gz = gz
        self.endian = endianness
        self.sync = sync
        self.nano = nano
        bufsz = 4096
        if sync:
            bufsz = 0

        if isinstance(filename, str):
            self.filename = filename
            if gz:
                self.f = cast(_ByteStream, gzip.open(
                    filename, append and "ab" or "wb", 9
                ))
            else:
                self.f = open(filename, append and "ab" or "wb", bufsz)
        else:
            self.f = filename
            self.filename = getattr(filename, "name", "No name")

    def fileno(self):
        # type: () -> int
        return self.f.fileno()

    def write_header(self, pkt):
        # type: (Optional[Union[Packet, bytes]]) -> None
        return self._write_header(bytes_encode(pkt))

    def _write_header(self, pkt):
        # type: (Optional[Union[Packet, bytes]]) -> None
        self.header_present = 1

        if self.append:
            # Even if prone to race conditions, this seems to be
            # safest way to tell whether the header is already present
            # because we have to handle compressed streams that
            # are not as flexible as basic files
            if self.gz:
                g = gzip.open(self.filename, "rb")  # type: _ByteStream
            else:
                g = open(self.filename, "rb")
            try:
                if g.read(16):
                    return
            finally:
                g.close()

        self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4,  # noqa: E501
                                 2, 4, 0, 0, self.snaplen, self.linktype))
        self.f.flush()

    def write(self, pkt):
        # type: (Union[_PacketIterable, bytes]) -> None
        """
        Writes a Packet, a SndRcvList object, or bytes to a pcap file.

        :param pkt: Packet(s) to write (one record for each Packet), or raw
                    bytes to write (as one record).
        :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes
        """
        if isinstance(pkt, bytes):
            if not self.header_present:
                self.write_header(pkt)
            self.write_packet(pkt)
        else:
            # Import here to avoid circular dependency
            from scapy.supersocket import IterSocket
            for p in IterSocket(pkt).iter:
                if not self.header_present:
                    self.write_header(p)

                if not isinstance(p, bytes) and \
                        self.linktype != conf.l2types.get(type(p), None):
                    warning("Inconsistent linktypes detected!"
                            " The resulting PCAP file might contain"
                            " invalid packets."
                            )

                self.write_packet(p)

    def write_packet(self,
                     packet,  # type: Union[bytes, Packet]
                     sec=None,  # type: Optional[int]
                     usec=None,  # type: Optional[int]
                     caplen=None,  # type: Optional[int]
                     wirelen=None,  # type: Optional[int]
                     ):
        # type: (...) -> None
        self._write_packet(
            bytes(packet),
            sec=sec, usec=usec,
            caplen=caplen, wirelen=wirelen
        )

    def _write_packet(self,
                      packet,  # type: bytes
                      sec=None,  # type: Optional[int]
                      usec=None,  # type: Optional[int]
                      caplen=None,  # type: Optional[int]
                      wirelen=None  # type: Optional[int]
                      ):
        # type: (...) -> None
        """
        Writes a single packet to the pcap file.

        :param packet: bytes for a single packet
        :type packet: bytes
        :param sec: time the packet was captured, in seconds since epoch. If
                    not supplied, defaults to now.
        :type sec: int or long
        :param usec: If ``nano=True``, then number of nanoseconds after the
                     second that the packet was captured. If ``nano=False``,
                     then the number of microseconds after the second the
                     packet was captured
        :type usec: int or long
        :param caplen: The length of the packet in the capture file. If not
                       specified, uses ``len(packet)``.
        :type caplen: int
        :param wirelen: The length of the packet on the wire. If not
                        specified, uses ``caplen``.
        :type wirelen: int
        :return: None
        :rtype: None
        """
        if caplen is None:
            caplen = len(packet)
        if wirelen is None:
            wirelen = caplen
        if sec is None or usec is None:
            t = time.time()
            it = int(t)
            if sec is None:
                sec = it
                usec = int(round((t - it) *
                                 (1000000000 if self.nano else 1000000)))
            elif usec is None:
                usec = 0

        self.f.write(struct.pack(self.endian + "IIII",
                                 sec, usec, caplen, wirelen))
        self.f.write(packet)
        if self.sync:
            self.f.flush()

    def flush(self):
        # type: () -> Optional[Any]
        return self.f.flush()

    def close(self):
        # type: () -> Optional[Any]
        if not self.header_present:
            self.write_header(None)
        return self.f.close()

    def __enter__(self):
        # type: () -> RawPcapWriter
        return self

    def __exit__(self, exc_type, exc_value, tracback):
        # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
        self.flush()
        self.close()


class PcapWriter(RawPcapWriter):
    """A stream PCAP writer with more control than wrpcap()"""

    def write_header(self, pkt):
        # type: (Optional[Union[Packet, bytes]]) -> None
        if self.linktype is None:
            try:
                if pkt is None or isinstance(pkt, bytes):
                    # Can't guess LL
                    raise KeyError
                self.linktype = conf.l2types.layer2num[
                    pkt.__class__
                ]
            except KeyError:
                warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)", pkt.__class__.__name__)  # noqa: E501
                self.linktype = DLT_EN10MB
        self._write_header(pkt)

    def write_packet(self,
                     packet,  # type: Union[bytes, Packet]
                     sec=None,  # type: Optional[int]
                     usec=None,  # type: Optional[int]
                     caplen=None,  # type: Optional[int]
                     wirelen=None,  # type: Optional[int]
                     ):
        # type: (...) -> None
        """
        Writes a single packet to the pcap file.

        :param packet: Packet, or bytes for a single packet
        :type packet: scapy.packet.Packet or bytes
        :param sec: time the packet was captured, in seconds since epoch. If
                    not supplied, defaults to now.
        :type sec: int or long
        :param usec: If ``nano=True``, then number of nanoseconds after the
                     second that the packet was captured. If ``nano=False``,
                     then the number of microseconds after the second the
                     packet was captured. If ``sec`` is not specified,
                     this value is ignored.
        :type usec: int or long
        :param caplen: The length of the packet in the capture file. If not
                       specified, uses ``len(raw(packet))``.
        :type caplen: int
        :param wirelen: The length of the packet on the wire. If not
                        specified, tries ``packet.wirelen``, otherwise uses
                        ``caplen``.
        :type wirelen: int
        :return: None
        :rtype: None
        """
        if hasattr(packet, "time"):
            if sec is None:
                sec = int(packet.time)  # type: ignore
                usec = int(round((packet.time - sec) *  # type: ignore
                                 (1000000000 if self.nano else 1000000)))
        if usec is None:
            usec = 0

        rawpkt = bytes_encode(packet)
        caplen = len(rawpkt) if caplen is None else caplen

        if wirelen is None:
            if hasattr(packet, "wirelen"):
                wirelen = packet.wirelen  # type: ignore
        if wirelen is None:
            wirelen = caplen

        self._write_packet(
            rawpkt,
            sec=sec, usec=usec,
            caplen=caplen, wirelen=wirelen
        )


@conf.commands.register
def import_hexcap(input_string=None):
    # type: (Optional[str]) -> bytes
    """Imports a tcpdump like hexadecimal view

    e.g: exported via hexdump() or tcpdump or wireshark's "export as hex"

    :param input_string: String containing the hexdump input to parse. If None,
        read from standard input.
    """
    re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})")  # noqa: E501
    p = ""
    try:
        if input_string:
            input_function = six.StringIO(input_string).readline
        else:
            input_function = input
        while True:
            line = input_function().strip()
            if not line:
                break
            try:
                p += re_extract_hexcap.match(line).groups()[2]  # type: ignore
            except Exception:
                warning("Parsing error during hexcap")
                continue
    except EOFError:
        pass

    p = p.replace(" ", "")
    return hex_bytes(p)


@conf.commands.register
def wireshark(pktlist, wait=False, **kwargs):
    # type: (List[Packet], bool, **Any) -> Optional[Any]
    """
    Runs Wireshark on a list of packets.

    See :func:`tcpdump` for more parameter description.

    Note: this defaults to wait=False, to run Wireshark in the background.
    """
    return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs)


@conf.commands.register
def tdecode(
    pktlist,  # type: Union[IO[bytes], None, str, _PacketIterable]
    args=None,  # type: Optional[List[str]]
    **kwargs  # type: Any
):
    # type: (...) -> Any
    """
    Run tshark on a list of packets.

    :param args: If not specified, defaults to ``tshark -V``.

    See :func:`tcpdump` for more parameters.
    """
    if args is None:
        args = ["-V"]
    return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs)


def _guess_linktype_name(value):
    # type: (int) -> str
    """Guess the DLT name from its value."""
    from scapy.libs.winpcapy import pcap_datalink_val_to_name
    return cast(bytes, pcap_datalink_val_to_name(value)).decode()


def _guess_linktype_value(name):
    # type: (str) -> int
    """Guess the value of a DLT name."""
    from scapy.libs.winpcapy import pcap_datalink_name_to_val
    val = cast(int, pcap_datalink_name_to_val(name.encode()))
    if val == -1:
        warning("Unknown linktype: %s. Using EN10MB", name)
        return DLT_EN10MB
    return val


@conf.commands.register
def tcpdump(
    pktlist=None,  # type: Union[IO[bytes], None, str, _PacketIterable]
    dump=False,  # type: bool
    getfd=False,  # type: bool
    args=None,  # type: Optional[List[str]]
    flt=None,  # type: Optional[str]
    prog=None,  # type: Optional[Any]
    getproc=False,  # type: bool
    quiet=False,  # type: bool
    use_tempfile=None,  # type: Optional[Any]
    read_stdin_opts=None,  # type: Optional[Any]
    linktype=None,  # type: Optional[Any]
    wait=True,  # type: bool
    _suppress=False  # type: bool
):
    # type: (...) -> Any
    """Run tcpdump or tshark on a list of packets.

    When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a
    temporary file to store the packets. This works around a bug in Apple's
    version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/

    Otherwise, the packets are passed in stdin.

    This function can be explicitly enabled or disabled with the
    ``use_tempfile`` parameter.

    When using ``wireshark``, it will be called with ``-ki -`` to start
    immediately capturing packets from stdin.

    Otherwise, the command will be run with ``-r -`` (which is correct for
    ``tcpdump`` and ``tshark``).

    This can be overridden with ``read_stdin_opts``. This has no effect when
    ``use_tempfile=True``, or otherwise reading packets from a regular file.

    :param pktlist: a Packet instance, a PacketList instance or a list of
        Packet instances. Can also be a filename (as a string), an open
        file-like object that must be a file format readable by
        tshark (Pcap, PcapNg, etc.) or None (to sniff)
    :param flt: a filter to use with tcpdump
    :param dump:    when set to True, returns a string instead of displaying it.
    :param getfd:   when set to True, returns a file-like object to read data
        from tcpdump or tshark from.
    :param getproc: when set to True, the subprocess.Popen object is returned
    :param args:    arguments (as a list) to pass to tshark (example for tshark:
        args=["-T", "json"]).
    :param prog:    program to use (defaults to tcpdump, will work with tshark)
    :param quiet:   when set to True, the process stderr is discarded
    :param use_tempfile: When set to True, always use a temporary file to store
        packets.
        When set to False, pipe packets through stdin.
        When set to None (default), only use a temporary file with
        ``tcpdump`` on OSX.
    :param read_stdin_opts: When set, a list of arguments needed to capture
        from stdin. Otherwise, attempts to guess.
    :param linktype: A custom DLT value or name, to overwrite the default
        values.
    :param wait: If True (default), waits for the process to terminate before
        returning to Scapy. If False, the process will be detached to the
        background. If dump, getproc or getfd is True, these have the same
        effect as ``wait=False``.

    Examples::

        >>> tcpdump([IP()/TCP(), IP()/UDP()])
        reading from file -, link-type RAW (Raw IP)
        16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0  # noqa: E501
        16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]

        >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
          1   0.000000    127.0.0.1 -> 127.0.0.1    TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0  # noqa: E501
          2   0.000459    127.0.0.1 -> 127.0.0.1    UDP 28 53->53 Len=0

    To get a JSON representation of a tshark-parsed PacketList(), one can::

        >>> import json, pprint
        >>> json_data = json.load(tcpdump(IP(src="217.25.178.5",
        ...                                  dst="45.33.32.156"),
        ...                               prog=conf.prog.tshark,
        ...                               args=["-T", "json"],
        ...                               getfd=True))
        >>> pprint.pprint(json_data)
        [{u'_index': u'packets-2016-12-23',
          u'_score': None,
          u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
                                              u'frame.encap_type': u'7',
        [...]
                                              },
                                   u'ip': {u'ip.addr': u'45.33.32.156',
                                           u'ip.checksum': u'0x0000a20d',
        [...]
                                           u'ip.ttl': u'64',
                                           u'ip.version': u'4'},
                                   u'raw': u'Raw packet data'}},
          u'_type': u'pcap_file'}]
        >>> json_data[0]['_source']['layers']['ip']['ip.ttl']
        u'64'
    """
    getfd = getfd or getproc
    if prog is None:
        if not conf.prog.tcpdump:
            raise Scapy_Exception(
                "tcpdump is not available"
            )
        prog = [conf.prog.tcpdump]
    elif isinstance(prog, six.string_types):
        prog = [prog]
    else:
        raise ValueError("prog must be a string")

    if linktype is not None:
        if isinstance(linktype, int):
            # Guess name from value
            try:
                linktype_name = _guess_linktype_name(linktype)
            except StopIteration:
                linktype = -1
        else:
            # Guess value from name
            if linktype.startswith("DLT_"):
                linktype = linktype[4:]
            linktype_name = linktype
            try:
                linktype = _guess_linktype_value(linktype)
            except KeyError:
                linktype = -1
        if linktype == -1:
            raise ValueError(
                "Unknown linktype. Try passing its datalink name instead"
            )
        prog += ["-y", linktype_name]

    # Build Popen arguments
    if args is None:
        args = []
    else:
        # Make a copy of args
        args = list(args)

    if flt is not None:
        # Check the validity of the filter
        if linktype is None and isinstance(pktlist, str):
            # linktype is unknown but required. Read it from file
            with PcapReader(pktlist) as rd:
                linktype = rd.linktype
        from scapy.arch.common import compile_filter
        compile_filter(flt, linktype=linktype)
        args.append(flt)

    stdout = subprocess.PIPE if dump or getfd else None
    stderr = open(os.devnull) if quiet else None
    proc = None

    if use_tempfile is None:
        # Apple's tcpdump cannot read from stdin, see:
        # http://apple.stackexchange.com/questions/152682/
        use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump

    if read_stdin_opts is None:
        if prog[0] == conf.prog.wireshark:
            # Start capturing immediately (-k) from stdin (-i -)
            read_stdin_opts = ["-ki", "-"]
        elif prog[0] == conf.prog.tcpdump and not OPENBSD:
            # Capture in packet-buffered mode (-U) from stdin (-r -)
            read_stdin_opts = ["-U", "-r", "-"]
        else:
            read_stdin_opts = ["-r", "-"]
    else:
        # Make a copy of read_stdin_opts
        read_stdin_opts = list(read_stdin_opts)

    if pktlist is None:
        # sniff
        with ContextManagerSubprocess(prog[0], suppress=_suppress):
            proc = subprocess.Popen(
                prog + args,
                stdout=stdout,
                stderr=stderr,
            )
    elif isinstance(pktlist, six.string_types):
        # file
        with ContextManagerSubprocess(prog[0], suppress=_suppress):
            proc = subprocess.Popen(
                prog + ["-r", pktlist] + args,
                stdout=stdout,
                stderr=stderr,
            )
    elif use_tempfile:
        pktlist = cast(Union[IO[bytes], "_PacketIterable"], pktlist)
        tmpfile = get_temp_file(  # type: ignore
            autoext=".pcap",
            fd=True
        )  # type: IO[bytes]
        try:
            tmpfile.writelines(
                iter(lambda: pktlist.read(1048576), b"")  # type: ignore
            )
        except AttributeError:
            pktlist = cast("_PacketIterable", pktlist)
            wrpcap(tmpfile, pktlist, linktype=linktype)
        else:
            tmpfile.close()
        with ContextManagerSubprocess(prog[0], suppress=_suppress):
            proc = subprocess.Popen(
                prog + ["-r", tmpfile.name] + args,
                stdout=stdout,
                stderr=stderr,
            )
    else:
        try:
            pktlist.fileno()  # type: ignore
            # pass the packet stream
            with ContextManagerSubprocess(prog[0], suppress=_suppress):
                proc = subprocess.Popen(
                    prog + read_stdin_opts + args,
                    stdin=pktlist,  # type: ignore
                    stdout=stdout,
                    stderr=stderr,
                )
        except (AttributeError, ValueError):
            # write the packet stream to stdin
            with ContextManagerSubprocess(prog[0], suppress=_suppress):
                proc = subprocess.Popen(
                    prog + read_stdin_opts + args,
                    stdin=subprocess.PIPE,
                    stdout=stdout,
                    stderr=stderr,
                )
            if proc is None:
                # An error has occurred
                return
            try:
                proc.stdin.writelines(  # type: ignore
                    iter(lambda: pktlist.read(1048576), b"")  # type: ignore
                )
            except AttributeError:
                wrpcap(proc.stdin, pktlist, linktype=linktype)  # type: ignore
            except UnboundLocalError:
                # The error was handled by ContextManagerSubprocess
                pass
            else:
                proc.stdin.close()  # type: ignore
    if proc is None:
        # An error has occurred
        return
    if dump:
        data = b"".join(
            iter(lambda: proc.stdout.read(1048576), b"")  # type: ignore
        )
        proc.terminate()
        return data
    if getproc:
        return proc
    if getfd:
        return proc.stdout
    if wait:
        proc.wait()


@conf.commands.register
def hexedit(pktlist):
    # type: (_PacketIterable) -> PacketList
    """Run hexedit on a list of packets, then return the edited packets."""
    f = get_temp_file()
    wrpcap(f, pktlist)
    with ContextManagerSubprocess(conf.prog.hexedit):
        subprocess.call([conf.prog.hexedit, f])
    rpktlist = rdpcap(f)
    os.unlink(f)
    return rpktlist


def get_terminal_width():
    # type: () -> Optional[int]
    """Get terminal width (number of characters) if in a window.

    Notice: this will try several methods in order to
    support as many terminals and OS as possible.
    """
    # Let's first try using the official API
    # (Python 3.3+)
    sizex = None  # type: Optional[int]
    if not six.PY2:
        import shutil
        sizex = shutil.get_terminal_size(fallback=(0, 0))[0]
        if sizex != 0:
            return sizex
    # Backups / Python 2.7
    if WINDOWS:
        from ctypes import windll, create_string_buffer  # type: ignore
        # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
        h = windll.kernel32.GetStdHandle(-12)
        csbi = create_string_buffer(22)
        res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
        if res:
            (bufx, bufy, curx, cury, wattr,
             left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)  # noqa: E501
            sizex = right - left + 1
            # sizey = bottom - top + 1
            return sizex
        return sizex
    else:
        # We have various methods
        # COLUMNS is set on some terminals
        try:
            sizex = int(os.environ['COLUMNS'])
        except Exception:
            pass
        if sizex:
            return sizex
        # We can query TIOCGWINSZ
        try:
            import fcntl
            import termios
            s = struct.pack('HHHH', 0, 0, 0, 0)
            x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
            sizex = struct.unpack('HHHH', x)[1]
        except IOError:
            pass
        return sizex


def pretty_list(rtlst,  # type: List[Tuple[Union[str, List[str]], ...]]
                header,  # type: List[Tuple[str, ...]]
                sortBy=0,  # type: int
                borders=False,  # type: bool
                ):
    # type: (...) -> str
    """
    Pretty list to fit the terminal, and add header.

    :param rtlst: a list of tuples. each tuple contains a value which can
        be either a string or a list of string.
    :param sortBy: the column id (starting with 0) which whill be used for
        ordering
    :param borders: whether to put borders on the table or not
    """
    if borders:
        _space = "|"
    else:
        _space = "  "
    cols = len(header[0])
    # Windows has a fat terminal border
    _spacelen = len(_space) * (cols - 1) + int(WINDOWS)
    _croped = False
    # Sort correctly
    rtlst.sort(key=lambda x: x[sortBy])
    # Resolve multi-values
    for i, line in enumerate(rtlst):
        ids = []  # type: List[int]
        values = []  # type: List[Union[str, List[str]]]
        for j, val in enumerate(line):
            if isinstance(val, list):
                ids.append(j)
                values.append(val or " ")
        if values:
            del rtlst[i]
            k = 0
            for ex_vals in zip_longest(*values, fillvalue=" "):
                if k:
                    extra_line = [" "] * cols
                else:
                    extra_line = list(line)  # type: ignore
                for j, h in enumerate(ids):
                    extra_line[h] = ex_vals[j]
                rtlst.insert(i + k, tuple(extra_line))
                k += 1
    rtslst = cast(List[Tuple[str, ...]], rtlst)
    # Append tag
    rtslst = header + rtslst
    # Detect column's width
    colwidth = [max(len(y) for y in x) for x in zip(*rtslst)]
    # Make text fit in box (if required)
    width = get_terminal_width()
    if conf.auto_crop_tables and width:
        width = width - _spacelen
        while sum(colwidth) > width:
            _croped = True
            # Needs to be cropped
            # Get the longest row
            i = colwidth.index(max(colwidth))
            # Get all elements of this row
            row = [len(x[i]) for x in rtslst]
            # Get biggest element of this row: biggest of the array
            j = row.index(max(row))
            # Re-build column tuple with the edited element
            t = list(rtslst[j])
            t[i] = t[i][:-2] + "_"
            rtslst[j] = tuple(t)
            # Update max size
            row[j] = len(t[i])
            colwidth[i] = max(row)
    if _croped:
        log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)")  # noqa: E501
    # Generate padding scheme
    fmt = _space.join(["%%-%ds" % x for x in colwidth])
    # Append separation line if needed
    if borders:
        rtslst.insert(1, tuple("-" * x for x in colwidth))
    # Compile
    return "\n".join(fmt % x for x in rtslst)


def __make_table(
    yfmtfunc,  # type: Callable[[int], str]
    fmtfunc,  # type: Callable[[int], str]
    endline,  # type: str
    data,  # type: List[Tuple[Packet, Packet]]
    fxyz,  # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]]
    sortx=None,  # type: Optional[Callable[[str], Tuple[Any, ...]]]
    sorty=None,  # type: Optional[Callable[[str], Tuple[Any, ...]]]
    seplinefunc=None,  # type: Optional[Callable[[int, List[int]], str]]
    dump=False  # type: bool
):
    # type: (...) -> Optional[str]
    """Core function of the make_table suite, which generates the table"""
    vx = {}  # type: Dict[str, int]
    vy = {}  # type: Dict[str, Optional[int]]
    vz = {}  # type: Dict[Tuple[str, str], str]
    vxf = {}  # type: Dict[str, str]

    # Python 2 backward compatibility
    fxyz = lambda_tuple_converter(fxyz)

    tmp_len = 0
    for e in data:
        xx, yy, zz = [str(s) for s in fxyz(*e)]
        tmp_len = max(len(yy), tmp_len)
        vx[xx] = max(vx.get(xx, 0), len(xx), len(zz))
        vy[yy] = None
        vz[(xx, yy)] = zz

    vxk = list(vx)
    vyk = list(vy)
    if sortx:
        vxk.sort(key=sortx)
    else:
        try:
            vxk.sort(key=int)
        except Exception:
            try:
                vxk.sort(key=atol)
            except Exception:
                vxk.sort()
    if sorty:
        vyk.sort(key=sorty)
    else:
        try:
            vyk.sort(key=int)
        except Exception:
            try:
                vyk.sort(key=atol)
            except Exception:
                vyk.sort()

    s = ""
    if seplinefunc:
        sepline = seplinefunc(tmp_len, [vx[x] for x in vxk])
        s += sepline + "\n"

    fmt = yfmtfunc(tmp_len)
    s += fmt % ""
    s += ' '
    for x in vxk:
        vxf[x] = fmtfunc(vx[x])
        s += vxf[x] % x
        s += ' '
    s += endline + "\n"
    if seplinefunc:
        s += sepline + "\n"
    for y in vyk:
        s += fmt % y
        s += ' '
        for x in vxk:
            s += vxf[x] % vz.get((x, y), "-")
            s += ' '
        s += endline + "\n"
    if seplinefunc:
        s += sepline + "\n"

    if dump:
        return s
    else:
        print(s, end="")
        return None


def make_table(*args, **kargs):
    # type: (*Any, **Any) -> Optional[Any]
    return __make_table(
        lambda l: "%%-%is" % l,
        lambda l: "%%-%is" % l,
        "",
        *args,
        **kargs
    )


def make_lined_table(*args, **kargs):
    # type: (*Any, **Any) -> Optional[str]
    return __make_table(  # type: ignore
        lambda l: "%%-%is |" % l,
        lambda l: "%%-%is |" % l,
        "",
        *args,
        seplinefunc=lambda a, x: "+".join(
            '-' * (y + 2) for y in [a - 1] + x + [-2]
        ),
        **kargs
    )


def make_tex_table(*args, **kargs):
    # type: (*Any, **Any) -> Optional[str]
    return __make_table(  # type: ignore
        lambda l: "%s",
        lambda l: "& %s",
        "\\\\",
        *args,
        seplinefunc=lambda a, x: "\\hline",
        **kargs
    )

####################
#   WHOIS CLIENT   #
####################


def whois(ip_address):
    # type: (str) -> bytes
    """Whois client for Python"""
    whois_ip = str(ip_address)
    try:
        query = socket.gethostbyname(whois_ip)
    except Exception:
        query = whois_ip
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("whois.ripe.net", 43))
    s.send(query.encode("utf8") + b"\r\n")
    answer = b""
    while True:
        d = s.recv(4096)
        answer += d
        if not d:
            break
    s.close()
    ignore_tag = b"remarks:"
    # ignore all lines starting with the ignore_tag
    lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))]  # noqa: E501
    # remove empty lines at the bottom
    for i in range(1, len(lines)):
        if not lines[-i].strip():
            del lines[-i]
        else:
            break
    return b"\n".join(lines[3:])

#######################
#   PERIODIC SENDER   #
#######################


class PeriodicSenderThread(threading.Thread):
    def __init__(self, sock, pkt, interval=0.5):
        # type: (Any, _PacketIterable, float) -> None
        """ Thread to send packets periodically

        Args:
            sock: socket where packet is sent periodically
            pkt: packet or list of packets to send
            interval: interval between two packets
        """
        if not isinstance(pkt, list):
            self._pkts = [cast("Packet", pkt)]  # type: _PacketIterable
        else:
            self._pkts = pkt
        self._socket = sock
        self._stopped = threading.Event()
        self._interval = interval
        threading.Thread.__init__(self)

    def run(self):
        # type: () -> None
        while not self._stopped.is_set():
            for p in self._pkts:
                self._socket.send(p)
                time.sleep(self._interval)
                if self._stopped.is_set():
                    break

    def stop(self):
        # type: () -> None
        self._stopped.set()


class SingleConversationSocket(object):
    def __init__(self, o):
        # type: (Any) -> None
        self._inner = o
        self._tx_mutex = threading.RLock()

    @property
    def __dict__(self):  # type: ignore
        return self._inner.__dict__

    def __getattr__(self, name):
        # type: (str) -> Any
        return getattr(self._inner, name)

    def sr1(self, *args, **kargs):
        # type: (*Any, **Any) -> Any
        with self._tx_mutex:
            return self._inner.sr1(*args, **kargs)

    def sr(self, *args, **kargs):
        # type: (*Any, **Any) -> Any
        with self._tx_mutex:
            return self._inner.sr(*args, **kargs)

    def send(self, x):
        # type: (Packet) -> Any
        with self._tx_mutex:
            return self._inner.send(x)