Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from datetime import date
import calendar

from snsql._ast.tokens import *
import datetime

"""
    date/time processing expressions
"""
def parse_datetime(val):
    if val is None:
        return None
    if isinstance(val, (datetime.date, datetime.time, datetime.datetime)):
        return val
    pass
    parsed = None
    try:
        parsed = datetime.date.fromisoformat(val)
    except:
        pass
    if not parsed:
        try:
            parsed = datetime.time.fromisoformat(val)
        except:
            pass
    if not parsed:
        try:
            parsed = datetime.datetime.fromisoformat(val)
        except:
            pass
    return parsed

class CurrentTimeFunction(SqlExpr):
    def children(self):
        return [Token("CURRENT_TIME")]
    def evaluate(self, bindings):
        return datetime.datetime.now().time()
    def symbol(self, relations):
        return CurrentTimeFunction()

class CurrentDateFunction(SqlExpr):
    def children(self):
        return [Token("CURRENT_DATE")]
    def evaluate(self, bindings):
        return datetime.datetime.now().date()
    def symbol(self, relations):
        return CurrentDateFunction()

class CurrentTimestampFunction(SqlExpr):
    def children(self):
        return [Token("CURRENT_TIMESTAMP")]
    def evaluate(self, bindings):
        return datetime.datetime.now()
    def symbol(self, relations):
        return CurrentTimestampFunction()

class DayNameFunction(SqlExpr):
    def __init__(self, expression):
        self.expression = expression
    def children(self):
        return [Token('DAYNAME'), Token('('), self.expression, Token(')')]
    def evaluate(self, bindings):
        exp = self.expression.evaluate(bindings)
        if exp is None:
            return None
        if isinstance(exp, str):
            try:
                exp = datetime.datetime.fromisoformat(exp)
            except:
                pass
        if isinstance(exp, (datetime.date, datetime.datetime)):
            return calendar.day_name[exp.weekday()]
        else:
            raise ValueError(f"Unable to get day name for: {str(exp)}")
    def symbol(self, relations):
        return DayNameFunction(self.expression)

class ExtractFunction(SqlExpr):
    def __init__(self, date_part, expression):
        self.date_part = date_part
        self.expression = expression

    def children(self):
        return [
            Token("EXTRACT"),
            Token("("),
            Token(str(self.date_part).upper()),
            Token('FROM'),
            self.expression,
            Token(")")
        ]

    def evaluate(self, bindings):
        exp = self.expression.evaluate(bindings)
        if exp is None:
            return None
        if isinstance(exp, str):
            parsed = parse_datetime(exp)
        else:
            parsed = exp
        if not isinstance(parsed, (datetime.date, datetime.time, datetime.datetime)):
            raise ValueError(f"Got unknown date/time format: {exp}")
        if self.date_part == 'weekday':
            if isinstance(parsed, datetime.time):
                return None
            return parsed.weekday()
        if self.date_part == 'day':
            if isinstance(parsed, datetime.time):
                return None
            return parsed.day
        elif self.date_part == 'month':
            if isinstance(parsed, datetime.time):
                return None
            return parsed.month
        elif self.date_part == 'year':
            if isinstance(parsed, datetime.time):
                return None
            return parsed.year
        elif self.date_part == 'hour':
            if isinstance(parsed, datetime.date) and not isinstance(parsed, datetime.datetime):
                return 0
            return parsed.hour
        elif self.date_part == 'minute':
            if isinstance(parsed, datetime.date) and not isinstance(parsed, datetime.datetime):
                return 0
            return parsed.minute
        elif self.date_part == 'second':
            if isinstance(parsed, datetime.date) and not isinstance(parsed, datetime.datetime):
                return 0
            return parsed.second
        elif self.date_part == 'microsecond':
            if isinstance(parsed, datetime.date) and not isinstance(parsed, datetime.datetime):
                return 0
            return parsed.microsecond
        elif self.date_part == 'epoch':
            return parsed.strftime('%s')
        else:
            raise ValueError(f"Unknown date part requested: {self.date_part}")

    def symbol(self, relations):
        return ExtractFunction(self.expression.symbol(relations), self.date_part)

    def __str__(self):
        return (
            f"EXTRACT( {str(str(self.date_part).upper())} FROM {str(self.expression)} )"
        )

class FromUnixTimeFunction(SqlExpr):
    def __init__(self, unix_timestamp, formatDate):
        self.unix_timestamp = unix_timestamp
        self.formatDate = formatDate

    def children(self):
        start = [Token("FROM_UNIXTIME"), Token("("), self.unix_timestamp]
        end = [Token(")")]
        middle = [] if not self.formatDate else [Token(","), self.formatDate]
        return start + middle + end

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate FROM_UNIXTIME expression")

    def symbol(self, relations):
        return FromUnixTimeFunction(
            self.unix_timestamp.symbol(relations),
            self.formatDate.symbol(relations),
        )

class UnixTimeStampFunction(SqlExpr):
    def __init__(self, expression):
        self.expression = expression

    def children(self):
        start = [Token("UNIX_TIMESTAMP"), Token("(")]
        end = [Token(")")]
        middle = [] if not self.expression else [self.expression]
        return start + middle + end

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate UNIX_TIMESTAMP expression")

    def symbol(self, relations):
        return UnixTimeStampFunction(
            self.expression.symbol(relations),
        )

class DateFormatFunction(SqlExpr):
    def __init__(self, expression, format):
        self.expression = expression
        self.format = format

    def children(self):
        return [
            Token("DATE_FORMAT"),
            Token("("),
            self.expression,
            Token(","),
            self.format,
            Token(")"),
        ]

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate DATE_FORMAT expression")

    def symbol(self, relations):
        return DateFormatFunction(
            self.expression.symbol(relations),
            self.format.symbol(relations),
        )

class FormatDateFunction(SqlExpr):
    def __init__(self, format, expression):
        self.format = format
        self.expression = expression

    def children(self):
        return [
            Token("FORMAT_DATE"),
            Token("("),
            self.format,
            Token(","),
            self.expression,
            Token(")"),
        ]

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate FORMAT_DATE expression")

    def symbol(self, relations):
        return FormatDateFunction(
            self.format.symbol(relations),
            self.expression.symbol(relations),
        )

class QuarterFunction(SqlExpr):
    def __init__(self, expression):
        self.expression = expression

    def children(self):
        return [
            Token("QUARTER"),
            Token("("),
            self.expression,
            Token(")"),
        ]

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate QUARTER expression")

    def symbol(self, relations):
        return QuarterFunction(
            self.expression.symbol(relations),
        )

class DowFunction(SqlExpr):
    def __init__(self, token, expression):
        self.token = token
        self.expression = expression

    def children(self):
        return [
            self.token,
            Token("("),
            self.expression,
            Token(")"),
        ]

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate DOW expression")

    def symbol(self, relations):
        return DowFunction(
            self.expression.symbol(relations),
        )

class DatetimeDiffFunction(SqlExpr):
    def __init__(self, order_date, ship_date, date_part):
        self.order_date = order_date
        self.ship_date = ship_date
        self.date_part = date_part

    def children(self):
        return [
            Token("DATETIME_DIFF"),
            Token("("),
            self.order_date,
            Token(","),
            self.ship_date,
            Token(","),
            Token(str(self.date_part).upper()),
            Token(")"),
        ]

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate DATETIME_DIFF expression")

    def symbol(self, relations):
        return DatetimeDiffFunction(
            self.order_date.symbol(relations),
            self.ship_date.symbol(relations),
            self.date_part,
        )

class DatePartFunction(SqlExpr):
    def __init__(self, date_part, interval):
        self.date_part = date_part
        self.interval = interval

    def children(self):
        return [
            Token("DATEPART"),
            Token("("),
            Token(str(self.date_part).upper()),
            Token(","),
            self.interval,
            Token(")"),
        ]

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate DATETPART expression")

    def symbol(self, relations):
        return DatePartFunction(
            self.date_part,
            self.interval.symbol(relations),
        )

class DateFunction(SqlExpr):
    def __init__(self, expression):
        self.expression = expression

    def children(self):
        return [
            Token("DATE"),
            Token("("),
            self.expression,
            Token(")"),
        ]

    def evaluate(self, bindings):
        raise NotImplementedError("Cannot evaluate DATE expression")

    def symbol(self, relations):
        return DateFunction(
            self.expression.symbol(relations),
        )