Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / aiohttp   python

Repository URL to install this package:

/ web_log.py

import datetime
import functools
import logging
import os
import re
from collections import namedtuple
from typing import Callable, Dict, Iterable, List, Tuple  # noqa

from .abc import AbstractAccessLogger
from .web_request import BaseRequest
from .web_response import StreamResponse

KeyMethod = namedtuple('KeyMethod', 'key method')


class AccessLogger(AbstractAccessLogger):
    """Helper object to log access.

    Usage:
        log = logging.getLogger("spam")
        log_format = "%a %{User-Agent}i"
        access_logger = AccessLogger(log, log_format)
        access_logger.log(request, response, time)

    Format:
        %%  The percent sign
        %a  Remote IP-address (IP-address of proxy if using reverse proxy)
        %t  Time when the request was started to process
        %P  The process ID of the child that serviced the request
        %r  First line of request
        %s  Response status code
        %b  Size of response in bytes, including HTTP headers
        %T  Time taken to serve the request, in seconds
        %Tf Time taken to serve the request, in seconds with floating fraction
            in .06f format
        %D  Time taken to serve the request, in microseconds
        %{FOO}i  request.headers['FOO']
        %{FOO}o  response.headers['FOO']
        %{FOO}e  os.environ['FOO']

    """
    LOG_FORMAT_MAP = {
        'a': 'remote_address',
        't': 'request_start_time',
        'P': 'process_id',
        'r': 'first_request_line',
        's': 'response_status',
        'b': 'response_size',
        'T': 'request_time',
        'Tf': 'request_time_frac',
        'D': 'request_time_micro',
        'i': 'request_header',
        'o': 'response_header',
    }

    LOG_FORMAT = '%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
    FORMAT_RE = re.compile(r'%(\{([A-Za-z0-9\-_]+)\}([ioe])|[atPrsbOD]|Tf?)')
    CLEANUP_RE = re.compile(r'(%[^s])')
    _FORMAT_CACHE = {}  # type: Dict[str, Tuple[str, List[KeyMethod]]]

    def __init__(self, logger: logging.Logger,
                 log_format: str=LOG_FORMAT) -> None:
        """Initialise the logger.

        logger is a logger object to be used for logging.
        log_format is a string with apache compatible log format description.

        """
        super().__init__(logger, log_format=log_format)

        _compiled_format = AccessLogger._FORMAT_CACHE.get(log_format)
        if not _compiled_format:
            _compiled_format = self.compile_format(log_format)
            AccessLogger._FORMAT_CACHE[log_format] = _compiled_format

        self._log_format, self._methods = _compiled_format

    def compile_format(self, log_format: str) -> Tuple[str, List[KeyMethod]]:
        """Translate log_format into form usable by modulo formatting

        All known atoms will be replaced with %s
        Also methods for formatting of those atoms will be added to
        _methods in appropriate order

        For example we have log_format = "%a %t"
        This format will be translated to "%s %s"
        Also contents of _methods will be
        [self._format_a, self._format_t]
        These method will be called and results will be passed
        to translated string format.

        Each _format_* method receive 'args' which is list of arguments
        given to self.log

        Exceptions are _format_e, _format_i and _format_o methods which
        also receive key name (by functools.partial)

        """
        # list of (key, method) tuples, we don't use an OrderedDict as users
        # can repeat the same key more than once
        methods = list()

        for atom in self.FORMAT_RE.findall(log_format):
            if atom[1] == '':
                format_key1 = self.LOG_FORMAT_MAP[atom[0]]
                m = getattr(AccessLogger, '_format_%s' % atom[0])
                key_method = KeyMethod(format_key1, m)
            else:
                format_key2 = (self.LOG_FORMAT_MAP[atom[2]], atom[1])
                m = getattr(AccessLogger, '_format_%s' % atom[2])
                key_method = KeyMethod(format_key2,
                                       functools.partial(m, atom[1]))

            methods.append(key_method)

        log_format = self.FORMAT_RE.sub(r'%s', log_format)
        log_format = self.CLEANUP_RE.sub(r'%\1', log_format)
        return log_format, methods

    @staticmethod
    def _format_i(key: str,
                  request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        if request is None:
            return '(no headers)'

        # suboptimal, make istr(key) once
        return request.headers.get(key, '-')

    @staticmethod
    def _format_o(key: str,
                  request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        # suboptimal, make istr(key) once
        return response.headers.get(key, '-')

    @staticmethod
    def _format_a(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        if request is None:
            return '-'
        ip = request.remote
        return ip if ip is not None else '-'

    @staticmethod
    def _format_t(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        now = datetime.datetime.utcnow()
        start_time = now - datetime.timedelta(seconds=time)
        return start_time.strftime('[%d/%b/%Y:%H:%M:%S +0000]')

    @staticmethod
    def _format_P(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        return "<%s>" % os.getpid()

    @staticmethod
    def _format_r(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        if request is None:
            return '-'
        return '%s %s HTTP/%s.%s' % (request.method, request.path_qs,
                                     request.version.major,
                                     request.version.minor)

    @staticmethod
    def _format_s(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> int:
        return response.status

    @staticmethod
    def _format_b(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> int:
        return response.body_length

    @staticmethod
    def _format_T(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        return str(round(time))

    @staticmethod
    def _format_Tf(request: BaseRequest,
                   response: StreamResponse,
                   time: float) -> str:
        return '%06f' % time

    @staticmethod
    def _format_D(request: BaseRequest,
                  response: StreamResponse,
                  time: float) -> str:
        return str(round(time * 1000000))

    def _format_line(self,
                     request: BaseRequest,
                     response: StreamResponse,
                     time: float) -> Iterable[Tuple[str,
                                                    Callable[[BaseRequest,
                                                              StreamResponse,
                                                              float],
                                                             str]]]:
        return [(key, method(request, response, time))
                for key, method in self._methods]

    def log(self,
            request: BaseRequest,
            response: StreamResponse,
            time: float) -> None:
        try:
            fmt_info = self._format_line(request, response, time)

            values = list()
            extra = dict()
            for key, value in fmt_info:
                values.append(value)

                if key.__class__ is str:
                    extra[key] = value
                else:
                    k1, k2 = key
                    dct = extra.get(k1, {})
                    dct[k2] = value  # type: ignore
                    extra[k1] = dct  # type: ignore

            self.logger.info(self._log_format % tuple(values), extra=extra)
        except Exception:
            self.logger.exception("Error in logging")