Repository URL to install this package:
| 
      
     
      
        
        
        Version: 
        
         
  
        
    
          
          0.4.198  ▾
        
         
  
      
        
      
  
      
  
     | 
    
    lib-py-b2b
  
    /
        util.py
    | 
|---|
from json import JSONEncoder
import decimal
import sys
import os
import collections
from os import environ
import six
import logging
from datetime import datetime, date
from dateutil.parser import parse
from dateutil.tz import UTC
from collections import Mapping, defaultdict
log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
                    datefmt='%d-%m-%Y:%H:%M:%S',
                    level=log_level)
logger = logging.getLogger(__name__)
class UtilityEncoder(JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        if isinstance(o, (datetime, date)):
            return o.isoformat()
        return super(UtilityEncoder, self).default(o)
def datetime_handler(x):
    if isinstance(x, datetime):
        return x.isoformat()
    raise TypeError("Unknown type")
def group_by(exp, lst):
    d = {}
    for l in lst:
        d.setdefault(exp(l), []).append(l)
    return d
def count_by(exp, lst):
    d = defaultdict(int)
    for l in lst:
        d[exp(l)] += 1
    return d
def path_fix():
    here = os.path.dirname(os.path.realpath(__file__))
    # Import installed packages (in site-packages)
    print(sys.path)
    site_pkgs = os.path.join(here, "lib", "vendor")
    sys.path.append(site_pkgs)
    print(sys.path)
def parse_dt(dt: str):
    if dt.endswith('00:00Z'):
        dt = dt.replace('00:00Z', '00:00')
    return parse(dt)
def is_sequence(obj):
    """Returns a true if its input is a collections.Sequence (except strings).
    Args:
    seq: an input sequence.
    Returns:
    True if the sequence is a not a string and is a collections.Sequence.
    """
    if not obj:
        return False
    return isinstance(obj, collections.Sequence) and not isinstance(obj, six.string_types)
def as_decimal(value, desc):
    try:
        val = decimal.Decimal(str(value))
        return val
    except Exception as ve:
        logger.error(f"[{desc}] should have been a decimal, but contained [{value}]", ve)
        return decimal.Decimal('0.0')
def safe_str(data_obj, field: str, default_value=None):
    return data_obj[field] if field in data_obj and data_obj[field] else default_value
def ascii_safe_str(data_obj, property_path: str, max_length: int = -1, default_value=None):
    if not data_obj:
        return default_value
    res = {'obj': data_obj}
    if isinstance(data_obj, Mapping):
        for attr in property_path.split("."):
            res['obj'] = res['obj'].get(attr, default_value)
    else:
        for attr in property_path.split("."):
            res['obj'] = getattr(res['obj'], attr, default_value)
    # return obj
    value = res['obj']
    if value:
        reencoded = value.encode('ascii', errors="ignore").decode()
        if max_length > 0:
            reencoded = reencoded[:max_length]
        return reencoded
    else:
        return value
def convert_date_to_utc_seconds(value):
    if isinstance(value, datetime):
        _dt_utc = value.astimezone(UTC)
        return _dt_utc.hour*60*60 + _dt_utc.minute*60 + _dt_utc.second*1
    elif isinstance(value, str):
        dt_arr = value.split(':')
        return int(dt_arr[0]) * 60 * 60 + int(dt_arr[1]) * 60 + int(dt_arr[2]) * 1
    else:
        return None
def is_time_between(begin_time, end_time, check_time=None):
    # If check time is not given, default to current UTC time
    check_time = check_time or datetime.utcnow().time()
    if begin_time < end_time:
        return begin_time <= check_time <= end_time
    else: # crosses midnight
        return check_time >= begin_time or check_time <= end_time
class DictQuery(dict):
    def get(self, path, default=None, delimiter="/"):
        keys = path.split(delimiter)
        val = None
        try:
            for key in keys:
                if val:
                    if isinstance(val, list):
                        val = [v.get(key, default) if v else None for v in val]
                    else:
                        val = val.get(key, default)
                else:
                    val = dict.get(self, key, default)
                if not val:
                    break
            if isinstance(val, list):
                if len(val) > 0:
                    return val[0]
                else:
                    return None
        except Exception as e:
            return None
        return val
def get_size(obj, seen=None):
    """Recursively finds size of objects"""
    size = sys.getsizeof(obj)
    if seen is None:
        seen = set()
    obj_id = id(obj)
    if obj_id in seen:
        return 0
    # Important mark as seen *before* entering recursion to gracefully handle
    # self-referential objects
    seen.add(obj_id)
    if isinstance(obj, dict):
        size += sum([get_size(v, seen) for v in obj.values()])
        size += sum([get_size(k, seen) for k in obj.keys()])
    elif hasattr(obj, '__dict__'):
        size += get_size(obj.__dict__, seen)
    elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
        size += sum([get_size(i, seen) for i in obj])
    return size