Repository URL to install this package:
|
Version:
0.2.102 ▾
|
from datetime import date
import calendar
from snsql._ast.tokens import *
import datetime
"""
date/time processing expressions
"""
def parse_datetime(val):
if val is None:
return None
if isinstance(val, (datetime.date, datetime.time, datetime.datetime)):
return val
pass
parsed = None
try:
parsed = datetime.date.fromisoformat(val)
except:
pass
if not parsed:
try:
parsed = datetime.time.fromisoformat(val)
except:
pass
if not parsed:
try:
parsed = datetime.datetime.fromisoformat(val)
except:
pass
return parsed
class CurrentTimeFunction(SqlExpr):
def children(self):
return [Token("CURRENT_TIME")]
def evaluate(self, bindings):
return datetime.datetime.now().time()
def symbol(self, relations):
return CurrentTimeFunction()
class CurrentDateFunction(SqlExpr):
def children(self):
return [Token("CURRENT_DATE")]
def evaluate(self, bindings):
return datetime.datetime.now().date()
def symbol(self, relations):
return CurrentDateFunction()
class CurrentTimestampFunction(SqlExpr):
def children(self):
return [Token("CURRENT_TIMESTAMP")]
def evaluate(self, bindings):
return datetime.datetime.now()
def symbol(self, relations):
return CurrentTimestampFunction()
class DayNameFunction(SqlExpr):
def __init__(self, expression):
self.expression = expression
def children(self):
return [Token('DAYNAME'), Token('('), self.expression, Token(')')]
def evaluate(self, bindings):
exp = self.expression.evaluate(bindings)
if exp is None:
return None
if isinstance(exp, str):
try:
exp = datetime.datetime.fromisoformat(exp)
except:
pass
if isinstance(exp, (datetime.date, datetime.datetime)):
return calendar.day_name[exp.weekday()]
else:
raise ValueError(f"Unable to get day name for: {str(exp)}")
def symbol(self, relations):
return DayNameFunction(self.expression)
class ExtractFunction(SqlExpr):
def __init__(self, date_part, expression):
self.date_part = date_part
self.expression = expression
def children(self):
return [
Token("EXTRACT"),
Token("("),
Token(str(self.date_part).upper()),
Token('FROM'),
self.expression,
Token(")")
]
def evaluate(self, bindings):
exp = self.expression.evaluate(bindings)
if exp is None:
return None
if isinstance(exp, str):
parsed = parse_datetime(exp)
else:
parsed = exp
if not isinstance(parsed, (datetime.date, datetime.time, datetime.datetime)):
raise ValueError(f"Got unknown date/time format: {exp}")
if self.date_part == 'weekday':
if isinstance(parsed, datetime.time):
return None
return parsed.weekday()
if self.date_part == 'day':
if isinstance(parsed, datetime.time):
return None
return parsed.day
elif self.date_part == 'month':
if isinstance(parsed, datetime.time):
return None
return parsed.month
elif self.date_part == 'year':
if isinstance(parsed, datetime.time):
return None
return parsed.year
elif self.date_part == 'hour':
if isinstance(parsed, datetime.date) and not isinstance(parsed, datetime.datetime):
return 0
return parsed.hour
elif self.date_part == 'minute':
if isinstance(parsed, datetime.date) and not isinstance(parsed, datetime.datetime):
return 0
return parsed.minute
elif self.date_part == 'second':
if isinstance(parsed, datetime.date) and not isinstance(parsed, datetime.datetime):
return 0
return parsed.second
elif self.date_part == 'microsecond':
if isinstance(parsed, datetime.date) and not isinstance(parsed, datetime.datetime):
return 0
return parsed.microsecond
elif self.date_part == 'epoch':
return parsed.strftime('%s')
else:
raise ValueError(f"Unknown date part requested: {self.date_part}")
def symbol(self, relations):
return ExtractFunction(self.expression.symbol(relations), self.date_part)
def __str__(self):
return (
f"EXTRACT( {str(str(self.date_part).upper())} FROM {str(self.expression)} )"
)
class FromUnixTimeFunction(SqlExpr):
def __init__(self, unix_timestamp, formatDate):
self.unix_timestamp = unix_timestamp
self.formatDate = formatDate
def children(self):
start = [Token("FROM_UNIXTIME"), Token("("), self.unix_timestamp]
end = [Token(")")]
middle = [] if not self.formatDate else [Token(","), self.formatDate]
return start + middle + end
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate FROM_UNIXTIME expression")
def symbol(self, relations):
return FromUnixTimeFunction(
self.unix_timestamp.symbol(relations),
self.formatDate.symbol(relations),
)
class UnixTimeStampFunction(SqlExpr):
def __init__(self, expression):
self.expression = expression
def children(self):
start = [Token("UNIX_TIMESTAMP"), Token("(")]
end = [Token(")")]
middle = [] if not self.expression else [self.expression]
return start + middle + end
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate UNIX_TIMESTAMP expression")
def symbol(self, relations):
return UnixTimeStampFunction(
self.expression.symbol(relations),
)
class DateFormatFunction(SqlExpr):
def __init__(self, expression, format):
self.expression = expression
self.format = format
def children(self):
return [
Token("DATE_FORMAT"),
Token("("),
self.expression,
Token(","),
self.format,
Token(")"),
]
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate DATE_FORMAT expression")
def symbol(self, relations):
return DateFormatFunction(
self.expression.symbol(relations),
self.format.symbol(relations),
)
class FormatDateFunction(SqlExpr):
def __init__(self, format, expression):
self.format = format
self.expression = expression
def children(self):
return [
Token("FORMAT_DATE"),
Token("("),
self.format,
Token(","),
self.expression,
Token(")"),
]
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate FORMAT_DATE expression")
def symbol(self, relations):
return FormatDateFunction(
self.format.symbol(relations),
self.expression.symbol(relations),
)
class QuarterFunction(SqlExpr):
def __init__(self, expression):
self.expression = expression
def children(self):
return [
Token("QUARTER"),
Token("("),
self.expression,
Token(")"),
]
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate QUARTER expression")
def symbol(self, relations):
return QuarterFunction(
self.expression.symbol(relations),
)
class DowFunction(SqlExpr):
def __init__(self, token, expression):
self.token = token
self.expression = expression
def children(self):
return [
self.token,
Token("("),
self.expression,
Token(")"),
]
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate DOW expression")
def symbol(self, relations):
return DowFunction(
self.expression.symbol(relations),
)
class DatetimeDiffFunction(SqlExpr):
def __init__(self, order_date, ship_date, date_part):
self.order_date = order_date
self.ship_date = ship_date
self.date_part = date_part
def children(self):
return [
Token("DATETIME_DIFF"),
Token("("),
self.order_date,
Token(","),
self.ship_date,
Token(","),
Token(str(self.date_part).upper()),
Token(")"),
]
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate DATETIME_DIFF expression")
def symbol(self, relations):
return DatetimeDiffFunction(
self.order_date.symbol(relations),
self.ship_date.symbol(relations),
self.date_part,
)
class DatePartFunction(SqlExpr):
def __init__(self, date_part, interval):
self.date_part = date_part
self.interval = interval
def children(self):
return [
Token("DATEPART"),
Token("("),
Token(str(self.date_part).upper()),
Token(","),
self.interval,
Token(")"),
]
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate DATETPART expression")
def symbol(self, relations):
return DatePartFunction(
self.date_part,
self.interval.symbol(relations),
)
class DateFunction(SqlExpr):
def __init__(self, expression):
self.expression = expression
def children(self):
return [
Token("DATE"),
Token("("),
self.expression,
Token(")"),
]
def evaluate(self, bindings):
raise NotImplementedError("Cannot evaluate DATE expression")
def symbol(self, relations):
return DateFunction(
self.expression.symbol(relations),
)