Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
smartnoise-sql / snsql / metadata.py
Size: Mime:
import yaml
import io
from os import path
import warnings
import re

from snsql.sql.reader.base import NameCompare

# implements spec at https://docs.smartnoise.org/en/stable/sql/metadata.html

class Metadata:
    """Information about a collection of tabular data sources"""

    def __init__(self, tables, engine=None, compare=None, dbname=None):
        """Instantiate a metadata object with information about tabular data sources

        :param tables: A list of Table descriptions
        :param engine: The name of the database engine used to query these tables.  Used for engine-
            specific name escaping and comparison.  Set to None to use default semantics.
        :param compare: A NameCompare object used to compare table names.  Set to None to use comparison rules
            associated with engine.
        :param dbname: The name of the database.  Used to match 3-part object names like dbname.schema.table.  Set to None to match any database name.
        """
        self.m_tables = dict([(t.table_name(), t) for t in tables])
        self.engine = engine if engine is not None else "Unknown"
        self.compare = NameCompare.get_name_compare(engine) if compare is None else compare
        self.dbname = dbname if dbname else None

    def __getitem__(self, tablename):
        schema_name = ""
        dbname = ""

        regex = re.compile(r'''((?:[^."\[\]]|"[^"]*"|\[[^\[]*\])+)''')
        parts = [
        sub_name for sub_name in regex.split(tablename) if sub_name not in ['.', '']
        ]
        # inferred from the query
        if len(parts) == 3:
            dbname, schema_name, tablename = parts
        elif len(parts) == 2:
            schema_name, tablename = parts

        matched_tables = []
        # matching with tables in the metadata
        for tname in self.m_tables.keys():
            table = self.m_tables[tname]

            # check if table name matches
            if not self.compare.identifier_match(tablename, table.name):
                continue

            # check if schema name matches
            if schema_name and table.schema:
                if not self.compare.schema_match(schema_name, table.schema):
                    continue
            else:
                if schema_name:
                    continue
                else:
                    pass

            # check if dbname matches
            if dbname and self.dbname and not self.compare.identifier_match(dbname, self.dbname):
                continue

            # all check passed, return table
            table.compare = self.compare
            matched_tables.append(table)

        if len(matched_tables) == 0:
            return None

        if len(matched_tables) > 1:
            """There are more than 1 table matching in the metadata.
            Raise ambigous table error"""
            qualif_name_list = [i for i in [dbname, schema_name, tablename] if i]
            name = ".".join(qualif_name_list)
            raise ValueError(f"Asked table {name} is ambigous. Available tables are: {list(self.m_tables.keys())}")

        return matched_tables[0]

    def __str__(self):
        desc = f"{self.dbname if self.dbname else '(no database name)'} using engine {self.engine}\n"
        return  desc + "\n".join([str(self.m_tables[table]) for table in self.m_tables.keys()])

    def tables(self):
        return [self.m_tables[tname] for tname in self.m_tables.keys()]

    def __iter__(self):
        return self.tables()

    @staticmethod
    def from_file(file):
        """Load the metadata about this collection from a YAML file"""
        ys = CollectionYamlLoader(file)
        return ys.read_file()

    @staticmethod
    def from_dict(schema_dict):
        """Load the metadata from a dict object"""
        ys = CollectionYamlLoader("dummy")
        return ys._create_metadata_object(schema_dict)

    @classmethod
    def from_(cls, val):
        if isinstance(val, Metadata):
            return val
        elif isinstance(val, (str, io.IOBase)):
            return cls.from_file(val)
        elif isinstance(val, dict):
            return cls.from_dict(val)
        else:
            raise ValueError(f"Metadata needs to be string, dictionary, or Metadata.  Got {str(type(val))}")

    def to_file(self, file, collection_name):
        """Save collection metadata to a YAML file"""
        ys = CollectionYamlLoader(file)
        ys.write_file(self, collection_name)

    def to_dict(self):
        metada_dict = {}
        if self.engine != 'Unknown':
            metada_dict["engine"] = self.engine
        tables = {}
        for table in self.tables():
            tdict = table.to_dict()
            schemaname = (
                table.schema if table.schema is not None
                else ''
            )
            if not schemaname in tables:
                tables[schemaname] = {}
            tables[schemaname][table.name] = tdict
        dbname = (
            '' if self.dbname is None
            else self.dbname
        )
        metada_dict[dbname] = tables
        return metada_dict




"""
    Common attributes for a table or a view
"""

class Table:
    """Information about a single tabular data source"""
    def __init__(
        self,
        schema,
        name,
        columns,
        *ignore,
        rowcount=0,
        rows_exact=None,
        row_privacy=False,
        max_ids=1,
        sample_max_ids=True,
        clamp_counts=False,
        clamp_columns=True,
        use_dpsu=False,
        censor_dims=True,
        is_public=False,
    ):
        """Instantiate information about a tabular data source.

        :param schema: The schema is the SQL-92 schema used for disambiguating table names.  See
            documentation for more information about schema search path and resolution.
        :param name: The table name that will be used by SQL queries to reference data
            in this table.
        :param rowcount: The rough number of rows in this table.  Should not be the exact number, and does not need to be accurate
        :param columns: A list of Column objects with information about each column in the table.
        """
        self.schema = schema
        self.name = name
        self.rowcount = rowcount
        self.row_privacy = row_privacy
        self.max_ids = max_ids
        self.sample_max_ids = sample_max_ids
        self.rows_exact = rows_exact
        self.use_dpsu = use_dpsu
        self.clamp_counts = clamp_counts
        self.clamp_columns = clamp_columns
        self.censor_dims = censor_dims
        self.is_public = is_public

        self.m_columns = dict([(c.name, c) for c in columns])
        self.compare = None

        if clamp_columns:
            for col in self.m_columns.values():
                if col.typename() in ["int", "float"] and (col.lower is None or col.upper is None):
                    if col.sensitivity is not None:
                        raise ValueError(
                            f"Column {col.name} has sensitivity and no bounds, but table specifies clamp_columns. "
                            "clamp_columns should be False, or bounds should be provided."
                        )

    def __getitem__(self, colname):
        for cname in self.m_columns.keys():
            col = self.m_columns[cname]
            if self.compare is None:
                # the database will attach the engine-specific comparer usually
                self.compare = NameCompare()
            if self.compare.identifier_match(colname, col.name):
                return col
        return None

    def __str__(self):
        tablename = (
            f"{self.schema}.{self.name}" if self.schema
            else f"{self.name}"
        )
        if self.is_public:
            tablename += " (public)"
        return (
            "  "
            + tablename
            + " ["
            + str(self.rowcount)
            + " rows]\n    "
            + "\n    ".join([str(self.m_columns[col]) for col in self.m_columns.keys()])
        )

    def key_cols(self):
        return [
            self.m_columns[name]
            for name in self.m_columns.keys()
            if self.m_columns[name].is_key == True
        ]

    def columns(self):
        return [self.m_columns[name] for name in self.m_columns.keys()]

    def __iter__(self):
        return self.columns()

    def table_name(self):
        return (self.schema + "." if len(self.schema.strip()) > 0 else "") + self.name

    def to_dict(self):
        tdict = {}
        if self.rowcount:
            tdict["rowcount"] = self.rowcount
        if self.rows_exact:
            tdict["rows_exact"] = self.rows_exact
        if self.row_privacy:
            tdict["row_privacy"] = self.row_privacy
        tdict["max_ids"] = self.max_ids
        tdict["sample_max_ids"] = self.sample_max_ids
        tdict["clamp_counts"] = self.clamp_counts
        tdict["clamp_columns"] = self.clamp_columns
        tdict["use_dpsu"] = self.use_dpsu
        tdict["censor_dims"] = self.censor_dims
        tdict["is_public"] = self.is_public

        for col in self.columns():
            tdict[col.name] = col.to_dict()
        return tdict


class String:
    """A column with string data"""
    def __init__(
        self,
        name,
        *ignore,
        card=None,
        is_key=False,
        nullable=True,
        missing_value=None,
        possible_values=[],
    ):
        self.name = name
        self.card = card
        self.is_key = is_key
        self.nullable = nullable
        self.missing_value = missing_value
        if self.missing_value is not None:
            self.nullable = False
        self.possible_values = possible_values
    def __str__(self):
        return ("*" if self.is_key else "") + str(self.name) + " (card: " + str(self.card) + ")"
    def typename(self):
        return "string"
    @property
    def unbounded(self):
        return self.card is None
    def to_dict(self):
        col_dict = {
            "name": self.name,
            "type": "string",
        }
        if self.card:
            col_dict["card"]=self.card
        if self.is_key:
            col_dict["is_key"]=self.is_key
        if self.nullable:
            col_dict["nullable"]=self.nullable
        if self.missing_value is not None:
            col_dict["missing_value"] = self.missing_value
        if self.possible_values != []:
            col_dict['possible_values'] = self.possible_values
        return col_dict

class Boolean:
    """A column with True/False data"""
    def __init__(
        self,
        name,
        *ignore,
        is_key=False,
        nullable=True,
        missing_value=None
    ):
        self.name = name
        self.is_key = is_key
        self.nullable = nullable
        self.missing_value = missing_value
        if self.missing_value is not None:
            self.nullable = False
        self.possible_values = [True, False]
    def __str__(self):
        return ("*" if self.is_key else "") + str(self.name) + " (boolean)"
    def typename(self):
        return "boolean"
    @property
    def unbounded(self):
        return False
    def to_dict(self):
        tdict = {
            "name": self.name,
            "type": "boolean",
        }
        if self.is_key:
            tdict["is_key"] = self.is_key
        if self.nullable:
            tdict["nullable"] = self.nullable
        if self.missing_value is not None:
            tdict["missing_value"] = self.missing_value
        return tdict

class DateTime:
    """A date/time column"""
    def __init__(
        self,
        name,
        *ignore,
        is_key=False,
        nullable=True,
        missing_value=None,
        possible_values=[],
    ):
        self.name = name
        self.is_key = is_key
        self.nullable = nullable
        self.missing_value = missing_value
        if self.missing_value is not None:
            self.nullable = False
        self.possible_values = possible_values
    def __str__(self):
        return ("*" if self.is_key else "") + str(self.name) + " (datetime)"
    def typename(self):
        return "datetime"
    @property
    def unbounded(self):
        return True
    def to_dict(self):
        tdict = {
            "name": self.name,
            "type": "datetime",
        }
        if self.is_key:
            tdict["is_key"] = self.is_key
        if self.nullable:
            tdict["nullable"] = self.nullable
        if self.missing_value is not None:
            tdict["missing_value"] = self.missing_value
        if self.possible_values != []:
            tdict["possible_values"] = self.possible_values
        return tdict

class Int:
    """A column with integer data"""
    def __init__(
        self,
        name,
        *ignore,
        lower=None,
        upper=None,
        is_key=False,
        sensitivity=None,
        nullable=True,
        missing_value=None,
        possible_values=[],
    ):
        self.name = name
        self.lower = lower
        self.upper = upper
        self.is_key = is_key
        self.sensitivity = sensitivity
        self.nullable = nullable
        self.missing_value = missing_value
        if self.missing_value is not None:
            self.nullable = False
        self.possible_values = possible_values
        if self.lower is not None and self.upper is not None and self.sensitivity is not None:
            sens_a = max(abs(self.lower), abs(self.upper))
            sens_b = abs(self.upper - self.lower)
            if self.sensitivity != sens_a and self.sensitivity != sens_b:
                warnings.warn(
                    f"Sensitivity for {self.name} will override sensitivity derived from bounds"
                )

    def __str__(self):
        bounds = "unbounded" if self.unbounded else str(self.lower) + "," + str(self.upper)
        return ("*" if self.is_key else "") + str(self.name) + " [int] (" + bounds + ")"
    def typename(self):
        return "int"
    @property
    def unbounded(self):
        return self.lower is None or self.upper is None
    def to_dict(self):
        col_dict = {
            "name": self.name,
            "type": "int",
        }
        if self.is_key:
            col_dict["is_key"] = self.is_key
        if  self.nullable:
            col_dict["nullable"] = self.nullable
        if self.lower:
            col_dict["lower"] = self.lower
        if self.upper:
            col_dict["upper"] =  self.upper
        if self.sensitivity:
            col_dict["sensitivity"] =  self.sensitivity
        if self.missing_value:
            col_dict["missing_value"] =  self.missing_value
        if self.possible_values != []:
            col_dict["possible_values"] =  self.possible_values
        return col_dict


class Float:
    """A floating point column"""
    def __init__(
        self,
        name,
        *ignore,
        lower=None,
        upper=None,
        is_key=False,
        sensitivity=None,
        nullable=True,
        missing_value=None,
        possible_values=[],
    ):
        self.name = name
        self.lower = lower
        self.upper = upper
        self.is_key = is_key
        self.sensitivity = sensitivity
        self.nullable = nullable
        self.missing_value = missing_value
        self.possible_values = possible_values

        if self.lower is not None and self.upper is not None and self.sensitivity is not None:
            sens_a = max(abs(self.lower), abs(self.upper))
            sens_b = abs(self.upper - self.lower)
            if self.sensitivity != sens_a and self.sensitivity != sens_b:
                warnings.warn(
                    f"Sensitivity for {self.name} will override sensitivity derived from bounds"
                )

    def __str__(self):
        bounds = "unbounded" if self.unbounded else str(self.lower) + "," + str(self.upper)
        return ("*" if self.is_key else "") + str(self.name) + " [float] (" + bounds + ")"
    def typename(self):
        return "float"
    @property
    def unbounded(self):
        return self.lower is None or self.upper is None
    def to_dict(self):
        col_dict = {
            "name": self.name,
            "type": "float",
        }
        if self.is_key:
            col_dict["is_key"] = self.is_key
        if  self.nullable:
            col_dict["nullable"] = self.nullable
        if self.lower:
            col_dict["lower"] = self.lower
        if self.upper:
            col_dict["upper"] =  self.upper
        if self.sensitivity:
            col_dict["sensitivity"] =  self.sensitivity
        if self.missing_value:
            col_dict["missing_value"] =  self.missing_value
        if self.possible_values != []:
            col_dict["possible_values"] =  self.possible_values
        return col_dict

class Unknown:
    """Column is unknown type.  Will be ignored.  May not be used in queries."""
    def __init__(self, name, possible_values=[]):
        self.name = name
        self.is_key = False
        self.possible_values = possible_values
    def __str__(self):
        return str(self.name) + " (unknown) " + str(self.possible_values)
    def typename(self):
        return "unknown"
    def unbounded(self):
        return True
    def to_dict(self):
        col_dict = {
            "name": self.name,
            "type": "unknown",
        }
        if self.is_key:
            col_dict["is_key"] = self.is_key
        if self.possible_values != []:
            col_dict["possible_values"] =  self.possible_values
        return col_dict

class CollectionYamlLoader:
    def __init__(self, file):
        self.file = file

    def read_file(self):
        if isinstance(self.file, io.IOBase):
            try:
                c_s = yaml.safe_load(self.file)
            except yaml.YAMLError as exc:
                raise
            return self._create_metadata_object(c_s)
        else:
            if not path.exists(self.file):
                raise ValueError(f"Unable to load metadata path {self.file}")
            with open(self.file, "r") as stream:
                try:
                    c_s = yaml.safe_load(stream)
                except yaml.YAMLError as exc:
                    raise
            return self._create_metadata_object(c_s)

    def _create_metadata_object(self, c_s):
        if not hasattr(c_s, "keys"):
            raise ValueError("Metadata must be a YAML dictionary")
        keys = list(c_s.keys())
        engine = "Unknown"

        if len(keys) == 0:
            raise ValueError("No collections in YAML file!")

        if len(keys) == 1:
            collection = keys[0]
        elif len(keys) > 2:
            raise ValueError("Please include only one collection per config file: " + str(keys))
        else:  # we have two keys; one should be engine
            if "engine" not in keys:
                raise ValueError("Please include only one collection per config file: " + str(keys))
            engine = c_s["engine"]
            collection = [k for k in keys if k != "engine"][0]

        db = c_s[collection]

        tables = []

        for schema in db.keys():
            s = db[schema]
            for table in s.keys():
                t = s[table]
                tables.append(self.load_table(schema, table, t))

        return Metadata(tables, engine, dbname=collection)

    def load_table(self, schema, table, t):
        rowcount = int(t["rows"]) if "rows" in t else 0
        rows_exact = int(t["rows_exact"]) if "rows_exact" in t else None
        row_privacy = bool(t["row_privacy"]) if "row_privacy" in t else False
        max_ids = int(t["max_ids"]) if "max_ids" in t else 1
        sample_max_ids = bool(t["sample_max_ids"]) if "sample_max_ids" in t else True
        use_dpsu = bool(t["use_dpsu"]) if "use_dpsu" in t else False
        clamp_counts = bool(t["clamp_counts"]) if "clamp_counts" in t else False
        clamp_columns = bool(t["clamp_columns"]) if "clamp_columns" in t else True
        censor_dims = bool(t["censor_dims"]) if "censor_dims" in t else True
        is_public = bool(t["is_public"]) if "is_public" in t else False

        columns = []
        colnames = [
            cn
            for cn in t.keys()
            if cn
            not in [
                "rows",
                "rows_exact",
                "row_privacy",
                "max_ids",
                "sample_max_ids",
                "clamp_counts",
                "clamp_columns",
                "use_dpsu",
                "censor_dims",
                "is_public"
            ]
        ]
        for column in colnames:
            columns.append(self.load_column(column, t[column]))

        return Table(
            schema,
            table,
            columns,
            rowcount=rowcount,
            rows_exact=rows_exact,
            row_privacy=row_privacy,
            max_ids=max_ids,
            sample_max_ids=sample_max_ids,
            clamp_counts=clamp_counts,
            clamp_columns=clamp_columns,
            use_dpsu=use_dpsu,
            censor_dims=censor_dims,
            is_public=is_public,
        )

    def load_column(self, column, c):
        lower = float(c["lower"]) if "lower" in c else None
        upper = float(c["upper"]) if "upper" in c else None
        is_key = False if "private_id" not in c else bool(c["private_id"])
        nullable = bool(c["nullable"]) if "nullable" in c else True
        missing_value = c["missing_value"] if "missing_value" in c else None
        sensitivity = c["sensitivity"] if "sensitivity" in c else None
        possible_values = c["possible_values"] if "possible_values" in c else []
        card = int(c["cardinality"]) if "cardinality" in c else 0


        if c["type"] == "boolean":
            return Boolean(column, is_key=is_key, nullable=nullable, missing_value=missing_value)
        elif c["type"] == "datetime":
            return DateTime(column, is_key=is_key, nullable=nullable, missing_value=missing_value, possible_values=possible_values)
        elif c["type"] == "int":
            return Int(
                column,
                lower=int(lower) if lower is not None else None,
                upper=int(upper) if upper is not None else None,
                is_key=is_key,
                nullable=nullable,
                missing_value=missing_value,
                sensitivity=sensitivity,
                possible_values=possible_values,
            )
        elif c["type"] == "float":
            sensitivity = c["sensitivity"] if "sensitivity" in c else None
            return Float(
                column,
                lower=lower,
                upper=upper,
                is_key=is_key,
                nullable=nullable,
                missing_value=missing_value,
                sensitivity=sensitivity,
                possible_values=possible_values,
            )
        elif c["type"] == "string":
            return String(column, card=card, is_key=is_key, nullable=nullable, missing_value=missing_value, possible_values=possible_values)
        else:
            raise ValueError("Unknown column type for column {0}: {1}".format(column, c))

    def write_file(self, collection_metadata, collection_name):

        engine = collection_metadata.engine
        schemas = {}
        for t in collection_metadata.tables():
            schema_name = t.schema
            table_name = t.name
            if schema_name not in schemas:
                schemas[schema_name] = {}
            schema = schemas[schema_name]
            if table_name in schema:
                raise ValueError(
                    "Attempt to insert table with same name twice: " + schema_name + table_name
                )
            schema[table_name] = {}
            table = schema[table_name]
            table["rows"] = t.rowcount
            if t.row_privacy is not None:
                table["row_privacy"] = t.row_privacy
            if t.max_ids is not None:
                table["max_ids"] = t.max_ids
            if t.sample_max_ids is not None:
                table["sample_max_ids"] = t.sample_max_ids
            if t.rows_exact is not None:
                table["rows_exact"] = t.rows_exact
            if t.use_dpsu is not None:
                table["use_dpsu"] = t.use_dpsu
            if t.clamp_counts is not None:
                table["clamp_counts"] = t.clamp_counts
            if t.clamp_columns is not None:
                table["clamp_columns"] = t.clamp_columns
            if t.censor_dims is not None:
                table["censor_dims"] = t.censor_dims

            for c in t.columns():
                cname = c.name
                if cname in table:
                    raise ValueError(
                        "Duplicate column name {0} in table {1}".format(cname, table_name)
                    )
                table[cname] = {}
                column = table[cname]
                if hasattr(c, "card"):
                    column["cardinality"] = c.card
                if hasattr(c, "lower") and c.lower is not None:
                    column["lower"] = c.lower
                if hasattr(c, "upper") and c.upper is not None:
                    column["upper"] = c.upper
                if hasattr(c, "nullable") and c.nullable is not None:
                    column["nullable"] = c.nullable
                if hasattr(c, "missing_value") and c.missing_value is not None:
                    column["missing_value"] = c.missing_value
                if hasattr(c, "sensitivity") and c.sensitivity is not None:
                    column["sensitivity"] = c.sensitivity
                if c.is_key is not None and c.is_key == True:
                    column["private_id"] = c.is_key
                if type(c) is String:
                    column["type"] = "string"
                elif type(c) is Int:
                    column["type"] = "int"
                elif type(c) is Float:
                    column["type"] = "float"
                elif type(c) is Boolean:
                    column["type"] = "boolean"
                elif type(c) is DateTime:
                    column["type"] = "datetime"
                elif type(c) is Unknown:
                    column["type"] = "unknown"
                else:
                    raise ValueError("Unknown column type: " + str(type(c)))
        db = {}
        db[collection_name] = schemas
        db["engine"] = collection_metadata.engine
        if isinstance(self.file, io.IOBase):
            raise ValueError("Cannot save metadata to a file stream.  Please use file path")
        with open(self.file, "w") as outfile:
            yaml.dump(db, outfile)