Repository URL to install this package:
|
Version:
0.4.139 ▾
|
lib-py-b2b
/
util.py
|
|---|
from json import JSONEncoder
import decimal
import sys
import os
import collections
from os import environ
import six
import logging
from datetime import datetime
from dateutil.tz import UTC
from collections import Mapping, defaultdict
log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%d-%m-%Y:%H:%M:%S',
level=log_level)
logger = logging.getLogger('lib-b2b-util')
class UtilityEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
if o % 1 > 0:
return float(o)
else:
return int(o)
if isinstance(o, datetime):
return o.isoformat()
return super(UtilityEncoder, self).default(o)
def datetime_handler(x):
if isinstance(x, datetime):
return x.isoformat()
raise TypeError("Unknown type")
def group_by(exp, lst):
d = {}
for l in lst:
d.setdefault(exp(l), []).append(l)
return d
def count_by(exp, lst):
d = defaultdict(int)
for l in lst:
d[exp(l)] += 1
return d
def path_fix():
here = os.path.dirname(os.path.realpath(__file__))
# Import installed packages (in site-packages)
print(sys.path)
site_pkgs = os.path.join(here, "lib", "vendor")
sys.path.append(site_pkgs)
print(sys.path)
def is_sequence(obj):
"""Returns a true if its input is a collections.Sequence (except strings).
Args:
seq: an input sequence.
Returns:
True if the sequence is a not a string and is a collections.Sequence.
"""
if not obj:
return False
return isinstance(obj, collections.Sequence) and not isinstance(obj, six.string_types)
def as_decimal(value, desc):
try:
val = decimal.Decimal(str(value))
return val
except Exception as ve:
logger.error(f"[{desc}] should have been a decimal, but contained [{value}]", ve)
return decimal.Decimal('0.0')
def safe_str(data_obj, field: str, default_value=None):
return data_obj[field] if field in data_obj and data_obj[field] else default_value
def ascii_safe_str(data_obj, property_path: str, max_length: int = -1, default_value=None):
if not data_obj:
return default_value
res = {'obj': data_obj}
if isinstance(data_obj, Mapping):
for attr in property_path.split("."):
res['obj'] = res['obj'].get(attr, default_value)
else:
for attr in property_path.split("."):
res['obj'] = getattr(res['obj'], attr, default_value)
# return obj
value = res['obj']
if value:
reencoded = value.encode('ascii', errors="ignore").decode()
if max_length > 0:
reencoded = reencoded[:max_length]
return reencoded
else:
return value
def convert_date_to_utc_seconds(value):
if isinstance(value, datetime):
_dt_utc = value.astimezone(UTC)
return _dt_utc.hour*60*60 + _dt_utc.minute*60 + _dt_utc.second*1
elif isinstance(value, str):
dt_arr = value.split(':')
return int(dt_arr[0]) * 60 * 60 + int(dt_arr[1]) * 60 + int(dt_arr[2]) * 1
else:
return None
def is_time_between(begin_time, end_time, check_time=None):
# If check time is not given, default to current UTC time
check_time = check_time or datetime.utcnow().time()
if begin_time < end_time:
return begin_time <= check_time <= end_time
else: # crosses midnight
return check_time >= begin_time or check_time <= end_time
class DictQuery(dict):
def get(self, path, default=None, delimiter="/"):
keys = path.split(delimiter)
val = None
try:
for key in keys:
if val:
if isinstance(val, list):
val = [v.get(key, default) if v else None for v in val]
else:
val = val.get(key, default)
else:
val = dict.get(self, key, default)
if not val:
break
if isinstance(val, list):
if len(val) > 0:
return val[0]
else:
return None
except Exception as e:
return None
return val
def get_size(obj, seen=None):
"""Recursively finds size of objects"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([get_size(v, seen) for v in obj.values()])
size += sum([get_size(k, seen) for k in obj.keys()])
elif hasattr(obj, '__dict__'):
size += get_size(obj.__dict__, seen)
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
size += sum([get_size(i, seen) for i in obj])
return size