Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / util.py
Size: Mime:
from json import JSONEncoder
import decimal
import sys
import os
import collections
from os import environ
import six
import logging
from datetime import datetime
from dateutil.tz import UTC
from collections import Mapping, defaultdict

log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
                    datefmt='%d-%m-%Y:%H:%M:%S',
                    level=log_level)
logger = logging.getLogger('lib-b2b-util')


class UtilityEncoder(JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        if isinstance(o, datetime):
            return o.isoformat()
        return super(UtilityEncoder, self).default(o)


def datetime_handler(x):
    if isinstance(x, datetime):
        return x.isoformat()
    raise TypeError("Unknown type")


def group_by(exp, lst):
    d = {}
    for l in lst:
        d.setdefault(exp(l), []).append(l)
    return d


def count_by(exp, lst):
    d = defaultdict(int)
    for l in lst:
        d[exp(l)] += 1
    return d


def path_fix():
    here = os.path.dirname(os.path.realpath(__file__))
    # Import installed packages (in site-packages)
    print(sys.path)
    site_pkgs = os.path.join(here, "lib", "vendor")
    sys.path.append(site_pkgs)
    print(sys.path)


def is_sequence(obj):
    """Returns a true if its input is a collections.Sequence (except strings).
    Args:
    seq: an input sequence.
    Returns:
    True if the sequence is a not a string and is a collections.Sequence.
    """
    if not obj:
        return False
    return isinstance(obj, collections.Sequence) and not isinstance(obj, six.string_types)


def as_decimal(value, desc):
    try:
        val = decimal.Decimal(str(value))
        return val
    except Exception as ve:
        logger.error(f"[{desc}] should have been a decimal, but contained [{value}]", ve)
        return decimal.Decimal('0.0')


def safe_str(data_obj, field: str, default_value=None):
    return data_obj[field] if field in data_obj and data_obj[field] else default_value


def ascii_safe_str(data_obj, property_path: str, max_length: int = -1, default_value=None):
    if not data_obj:
        return default_value
    res = {'obj': data_obj}
    if isinstance(data_obj, Mapping):
        for attr in property_path.split("."):
            res['obj'] = res['obj'].get(attr, default_value)
    else:
        for attr in property_path.split("."):
            res['obj'] = getattr(res['obj'], attr, default_value)
    # return obj
    value = res['obj']
    if value:
        reencoded = value.encode('ascii', errors="ignore").decode()
        if max_length > 0:
            reencoded = reencoded[:max_length]
        return reencoded
    else:
        return value


def convert_date_to_utc_seconds(value):
    if isinstance(value, datetime):
        _dt_utc = value.astimezone(UTC)
        return _dt_utc.hour*60*60 + _dt_utc.minute*60 + _dt_utc.second*1
    elif isinstance(value, str):
        dt_arr = value.split(':')
        return int(dt_arr[0]) * 60 * 60 + int(dt_arr[1]) * 60 + int(dt_arr[2]) * 1
    else:
        return None


def is_time_between(begin_time, end_time, check_time=None):
    # If check time is not given, default to current UTC time
    check_time = check_time or datetime.utcnow().time()
    if begin_time < end_time:
        return begin_time <= check_time <= end_time
    else: # crosses midnight
        return check_time >= begin_time or check_time <= end_time


class DictQuery(dict):
    def get(self, path, default=None, delimiter="/"):
        keys = path.split(delimiter)
        val = None
        try:
            for key in keys:
                if val:
                    if isinstance(val, list):
                        val = [v.get(key, default) if v else None for v in val]
                    else:
                        val = val.get(key, default)
                else:
                    val = dict.get(self, key, default)
                if not val:
                    break
            if isinstance(val, list):
                if len(val) > 0:
                    return val[0]
                else:
                    return None
        except Exception as e:
            return None
        return val


def get_size(obj, seen=None):
    """Recursively finds size of objects"""
    size = sys.getsizeof(obj)
    if seen is None:
        seen = set()
    obj_id = id(obj)
    if obj_id in seen:
        return 0
    # Important mark as seen *before* entering recursion to gracefully handle
    # self-referential objects
    seen.add(obj_id)
    if isinstance(obj, dict):
        size += sum([get_size(v, seen) for v in obj.values()])
        size += sum([get_size(k, seen) for k in obj.keys()])
    elif hasattr(obj, '__dict__'):
        size += get_size(obj.__dict__, seen)
    elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
        size += sum([get_size(i, seen) for i in obj])
    return size