Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
PyPxTools / metadata_helper.py
Size: Mime:
import os
from collections import OrderedDict

import pandas as pd
from pandas import ExcelWriter
from ipywidgets import widgets

class BaseDataProvider(object):

    def __init__(self):


        self.col_target_oldval = None
        self.col_target_newval = None
        self.src_field_cols = {}
        self._md_values = {}  # cache for  possible metadata values per content

        self._df_data = None
        self._df_metainfo = None

    def get_df_data(self):
        """
        Returns dataframe containing actual data. Cached.
        :return:
        """
        if self._df_data is None:
            self._df_data = self._create_df_data()
        return self._df_data

    def get_df_metainfo(self):
        """
        Returns dataframe containing meta information about data. Cached.
        :return:
        """
        if self._df_metainfo is None:
            self._df_metainfo = self._create_df_metainfo()
        return self._df_metainfo

    def get_value(self, content_id, metadata_field_id):
        """
        Gets value for a metadata field
        :param content_id:
        :param src_field_id:
        :return:
        """
        # we get the value from the dataframe, but first we need to figure out which column to look in
        col = None
        if metadata_field_id in self.get_source_field_uuids():
            col = self.src_field_cols[metadata_field_id]
        elif metadata_field_id is self.get_target_field_uuid():
            col = self.col_target_oldval
        else:
            raise AttributeError(u'unknown metadata_field_id: {}'.format(metadata_field_id))

        return self.get_df_data()[col].loc[content_id]

    def get_values(self, content_uuid):
        """
        Compiles unique metadata values for a content.
        :param content:
        :return: Dict, unique metadata values as key, list w/ metadata fields or the metadata value as value
        """
        values = self._md_values.get(content_uuid)
        if values is None:
            values = {}
            # compile entry for each metadata description
            for md_d_id in self.get_source_field_uuids():
                metadata_value = self.get_value(content_uuid, md_d_id)
                if metadata_value is not None:  # we don't need to consider if not present at this content
                    if metadata_value in values.keys():  # same value already found in different metadata field
                        values[metadata_value].append(self.get_metadata_field_name(md_d_id))
                    else:  # 1st time value found in any md field
                        values[metadata_value] = [self.get_metadata_field_name(md_d_id)]

            # cache result
            self._md_values[content_uuid] = values
        return values

    def get_content_uuids(self):
        """
        :return: list w/ uuid for contents objects
        """
        raise NotImplementedError

    def get_source_field_uuids(self):
        """
        :return: list w/ uuid for metadata_fields considered as source fields
        """
        raise NotImplementedError

    def get_target_field_uuid(self):
        """
        :return: uuid of metadata field considerd as target field
        """
        raise NotImplementedError

    def _create_df_data(self):
        """
        Triggers creation of dataframe w/ data in child classes. not cached
        :return:
        """
        raise NotImplementedError

    def _create_df_metainfo(self):
        """
        Triggers creation of dataframe w/ meta info in child classes. not cached
        :return:
        """
        raise NotImplementedError


    def get_metadata_field_name(self, metadata_field_id):
        """
        Returns the name of the metadata field (source field or target field) specified by the passed id
        :param metadata_field_id:
        :return:
        """
        raise NotImplementedError

    def get_content_filename(self, content_id):
        raise NotImplementedError


    def get_col_names(self):
        """
        internally, data is handled in dataframe. This df will have metadata field names in column names
        Below, column names for this dataset are defined and written to instance variables for later reference
         making access to columns easier
         columns for content
        :return:
        """

        self.col_ctd_id = u'content_uuid'
        self.col_ctd_fname = u'content_file_name'

        # columns for target field
        target_field_name = self.get_metadata_field_name(self.get_target_field_uuid())
        self.col_target_oldval = u'{} old value'.format(target_field_name)
        self.col_target_newval = u'{} new value'.format(target_field_name)
        # columns for source fields
        self.src_field_cols = {}
        for sf_id in self.get_source_field_uuids():
            sf_name = self.get_metadata_field_name(sf_id)
            col = u'src field: {}'.format(sf_name)
            self.src_field_cols[sf_id] = col


class PxDataProvider(BaseDataProvider):

    def __init__(self, query, target_field_uuid, source_field_uuids, content_list, *args, **kwargs):
        super(PxDataProvider, self).__init__(*args, **kwargs)
        self.query = query

        self.target_field_uuid = target_field_uuid
        self.target_field = query.get_metadata_description(self.target_field_uuid)

        self.source_field_uuids = source_field_uuids
        self.content_list = content_list

        self.last_query = pd.to_datetime('now')

        self.content_by_uuid = {}
        for ctd in self.content_list:
            self.content_by_uuid[ctd.uuid] = ctd

        self.source_fields_by_uuid = {}
        for sf_id in self.source_field_uuids:
            self.source_fields_by_uuid[sf_id] = query.get_metadata_description(sf_id)

        self._df_data = None
        self._df_metainfo = None

        self.get_col_names()

    @property
    def content_uuids(self):
        return self.content_by_uuid.keys()

    @property
    def source_fields(self):
        return self.source_fields_by_uuid.values()

    def get_content_filename(self, content_id):
        return self.content_by_uuid[content_id].file_name

    def get_metadata_field_name(self, field_id):
        # get name from PxMetaDescription obj
        # probably in source fields
        ret = self.source_fields_by_uuid.get(field_id)
        if ret is None:
            # maybe target field?
            if field_id == self.target_field_uuid:
                ret = self.target_field

        if ret is not None:
            return ret.name

        return None

    def get_target_field_uuid(self):
        return self.target_field_uuid

    def get_source_field_uuids(self):
        return self.source_field_uuids

    def get_content_uuids(self):
        return self.content_uuids

    def _create_df_data(self):
        """
        Creates a data frame w/ all necessary information. This dataframe will be the base for all other
        data queries in this class. It can be imported or is generated by this method.
        :return:
        """

        # creation of this df is the first thing that happens on instance creation (if no df was passed in constructor)
        # for this reason, this method does not access any instance variables. In fact, most instance variables will
        # be filled w/ data from this df later on

        # make sure column names are set
        self.get_col_names()

        # prepare lists w/ data to creatd df from
        ctd_uuids = []
        ctd_fnames = []
        tf_old_values = []
        tf_new_values = [None] * len(self.content_list)
        sf_values = {}
        for sf in self.source_field_uuids:
            sf_col = self.src_field_cols[sf]
            sf_values[sf_col] = []

        # iterate over content
        for ctd in self.content_list:
            ctd_uuids.append(ctd.uuid)
            ctd_fnames.append(ctd.name)
            tf_old_values.append(ctd.get_metadata_value(self.target_field_uuid))
            for sf in self.source_field_uuids:
                col_sf = self.src_field_cols[sf]
                sf_values[col_sf].append(ctd.get_metadata_value(sf))

        data_dict = {self.col_ctd_id: ctd_uuids, self.col_ctd_fname: ctd_fnames,
                     self.col_target_oldval: tf_old_values, self.col_target_newval: tf_new_values
                     }
        data_dict.update(sf_values)

        df = pd.DataFrame(data=data_dict, index=ctd_uuids)
        # df = pd.DataFrame(data=data_dict)

        # record timestamp of df creation/ last server query
        self.last_query = pd.to_datetime('now')

        return df

    def _create_df_metainfo(self):
        """
        Creates dataframe with meta info for this object
        :return:
        """
        dict_meta = OrderedDict()
        dict_meta['last server access'] = self.last_query
        dict_meta['target field'] = [self.target_field.name, self.target_field_uuid]

        for i, sf in enumerate(self.source_fields):
            dict_meta['source field {!s}'.format(i + 1)] = [sf.name, sf.uuid]

        return pd.DataFrame(dict_meta).T



class DataFrameDataProvider(BaseDataProvider):

    def __init__(self, filename, *args, **kwargs):
        super(DataFrameDataProvider, self).__init__(*args, **kwargs)

        self.filename = filename

        self.source_field_uuids = None
        self.source_field_name_by_uuid = None
        self.target_field_uuid = self.get_df_metainfo().loc['target field'].loc[1]
        self.target_field_name = self.get_df_metainfo().loc['target field'].loc[0]

        self.get_col_names()

    def get_source_field_name(self, sf_uuid):
        """
        Gets name for a source field
        :param sf_uuid: id of the source field
        :return: naem od the source field or None if field is unknown
        """
        # get value from cache, build cache if necessary
        if self.source_field_name_by_uuid is None:
            self.source_field_name_by_uuid = {}
            # iterate over all known columns fields
            for idx in self.get_df_metainfo().index:
                if idx.startswith('source field'):
                    sf_id = self.get_df_metainfo().loc[idx].loc[1]
                    sf_name = self.get_df_metainfo().loc[idx].loc[0]
                    self.source_field_name_by_uuid[sf_id] = sf_name

        return self.source_field_name_by_uuid.get(sf_uuid)


    def get_content_uuids(self):
        return self.get_df_data().index

    def get_content_filename(self, content_id):
        return self.get_df_data()[self.col_ctd_fname].loc[content_id]

    def get_source_field_uuids(self):
        if self.source_field_uuids is None:
            self.source_field_uuids = []
            for idx in self.get_df_metainfo().index:
                if idx.startswith('source field'):
                    sf_id = self.get_df_metainfo().loc[idx].loc[1]
                    self.source_field_uuids.append(sf_id)

        return self.source_field_uuids

    def get_target_field_uuid(self):
        return self.target_field_uuid

    def _create_df_data(self):
        return pd.read_excel(self.filename, sheetname='data', encoding='utf-8').T

    def _create_df_metainfo(self):
        return pd.read_excel(self.filename, sheetname='meta_info', encoding='utf-8')

    def get_metadata_field_name(self, metadata_field_id):
        # return either target field name or one of the source field names
        if metadata_field_id == self.target_field_uuid:
            return self.target_field_name

        else:
            return self.get_source_field_name(metadata_field_id)


class MetadataConsolidator(object):
    """
    Class for merging metadata values from several source-fields to one target field
    """

    def __init__(self, query, **kwargs):
        """
        Tool for consolidating multiple metadata fields (source fields) into one field (target field).
        Tool gets metadata values from ProxSys, contains methods for setting new values for target field,
        allows persisting of target + source field vaules to excel, reads back modified excel files and sets
        new metadata values
        :param query: PxQuery object for communication w/ ProxSys server
        :param target_field: uuid of MetadataDescription for the target field
        :param source_fields: list w/ uuid of MetadataDescription of source fields.
            Target field uuid should be contained here explicitly if old value of target field is considered
            a possible source field as well.
        :param content_list: List with PxContent objects for which metadata will be set
        """
        # TODO implement reading of Excel
        # TODO implement setting new values in ProxSys
        # TODO implement passing content as uuid list

        self.query = query
        self.data_provider = None

        target_field = kwargs.get('target_field_id')
        source_fields = kwargs.get('source_field_ids')
        ctd_list = kwargs.get('content_list')

        filename = kwargs.get('filename')

        if target_field is not None and source_fields is not None and ctd_list is not None:
            self.data_provider = PxDataProvider(query, target_field, source_fields, ctd_list)
        else:
            self.data_provider = DataFrameDataProvider(filename)

        self._dropdown_wdgts = {}  # cache for value selection dropdown widgets

        # content can be sorted by the unique metadata values it has:
        self.no_value_content = []  # content where no value (only empty values) is set in any of the fields
        self.single_value_content = []  # content where only one unique, non empty value exists
        self.multi_value_content = [] # content where multiple uniqe, non-empty values exist
        self.obvious_values = {}  # while we are sorting anyway we can remember obiuos values for a content (unique non empty)
        # method below fills the bins defined above
        self._sort_content()

    @property
    def df_data(self):
        return self.data_provider.get_df_data()

    @property
    def df_metainfo(self):
        return self.data_provider.get_df_metainfo()

    def to_excel(self, filename):
        """
        Exports data and metadata to excel
        :return:
        """

        # we want to export two sheets, use Excelwriter to do so
        writer = ExcelWriter(filename, engine='xlsxwriter')
        # transpose data, makes it wide and short (content in columns), easier to work with in excel
        self.df_data.T.to_excel(writer, sheet_name='data')  # write first sheet with the main data
        self.df_metainfo.to_excel(writer, sheet_name='meta_info')  # write snd sheet w/ metainfo
        writer.save()


    def _sort_content(self):
        """
        Sorts the content according to the unique values found in the metadata fields:
            - no values: content where no value (i.e. only empty values) is set in any of the fields
            - single unique value: content where only one unique, non empty value was found
            - multiple unique values: content where multiple uniqe, non-empty values were found

        member variables are stored for each type.
        :return:
        """

        for ctd_id in self.data_provider.get_content_uuids():
            values = self.data_provider.get_values(ctd_id)
            # check if only None is set
            keys = list(values.keys())
            if len(keys) == 0 or (len(keys) == 1 and keys[0] is None):
                self.no_value_content.append(ctd_id)

            # if only one unique value was found and it was not None:
            elif len(keys) == 1:
                self.single_value_content.append(ctd_id)
                self.obvious_values[ctd_id] = keys[0]  # remember obvious choice

            else: # more then one unique, value found
                # we still need to check how many unique values remain if we don't count he None value
                non_mt_vals = []
                for val in keys:
                    if val is not None:
                        non_mt_vals.append(val)
                if len(non_mt_vals) == 1:  # only one non-empty value found
                    self.single_value_content.append(ctd_id)
                    self.obvious_values[ctd_id] = non_mt_vals[0]  # remember obvious choice
                else: # more than one non-empty value!
                    self.multi_value_content.append(ctd_id)


    def render_df_html(self):
        """
        Returns a 'nice' html output for the data
        :return:
        """
        # from http://stackoverflow.com/questions/35779631/improve-html-styling-of-pandas-dataframes-within-ipython-widgets
        return self.df.style.set_table_attributes('class="table"').render()

    def get_value_select_wdgt(self, content_id):
        """
        Creates Box containing string label and dropdown for selecting a value
        :param content:
        :return:
        """
        ctd_fname = self.data_provider.get_content_filename(content_id)
        ctd_label = widgets.HTML('<b>{}</b><br>[{}]'.format(ctd_fname, content_id), padding=5)
        val_wdgt = self.get_value_select_dropdown(content_id)

        box = widgets.VBox([ctd_label, val_wdgt], padding=10)

        return box

    def get_value_select_dropdown(self, content_id):
        """
        Returns IPywidget Dropdown for selecting field value from source+ target fields.
        Creates new widget only if necessary, caches result
        :param content:
        :return:
        """
        # try cache
        ret = self._dropdown_wdgts.get(content_id)
        if  ret is None:  # nothing in cache
            # create widget, update cache
            ret = widgets.Dropdown(options=self._get_md_options(content_id))
            self._dropdown_wdgts[content_id] = ret

        return ret

    def _get_md_options(self, content_id):
        """
        Creates and returns dictionary w/ options for metadata values for this content to put into dropdown widget.
        Outsourced from self.get_value_select_dropdown to make it testable via nose
        :param content:
        :return:
        """
        # put all metadata fields in options, target as well as sources
        md_d_ids = [self.data_provider.get_target_field_uuid()]
        md_d_ids.extend(self.data_provider.get_source_field_uuids())
        values = self.data_provider.get_values(content_id)
        options = {}
        for md_val, md_fields in values.items():
            # display value as well as field names in dropdown
            o_key = u'{!s} {!s}'.format(md_val, md_fields)
            # for seletion, only value is of interest
            o_val = md_val
            options[o_key] = o_val

        return options

    @staticmethod
    def compare_df(df_x, df_y):
        """
        Compares structure and values of two df. Makes assumtions about equalness that are correct for
        data found in df_data and df_metainfo
        :param df_x:
        :param df_y:
        :return:
        """
        all_ok = True

        if not df_x.index.equals(df_y.index):
            all_ok = False
            print('Comparisnon failed, index does not match!')

        if not df_x.columns.equals(df_y.columns):
            all_ok = False
            print('Comparison failed, columns do not match!')

        if all_ok:  # comparison of fields doesn't make sens if cols/ idx doesn't match
            import numpy as np
            # next operations alter original dataframes, better make a copy
            df_a = df_x.copy()
            df_b = df_y.copy()
            df_a.replace(r'', np.nan, regex=False, inplace=True)
            df_a.fillna(value=np.nan, inplace=True)
            df_b.replace(r'', np.nan, regex=False, inplace=True)
            df_b.fillna(value=np.nan, inplace=True)
            for col in df_a.columns:
                for idx in df_a.index:
                    val_a = df_a.loc[idx, col]
                    val_b = df_b.loc[idx, col]
                    if pd.isnull(val_a) and pd.isnull(val_b):
                        continue
                    if val_a != val_b:
                        all_ok = False
                        print(idx + ',' + col + ' p: ' + repr(val_b) + '  x: ' + repr(val_b) )

        return all_ok

    def compare_df_data(self, other_df):
        """
        Compares the data df of this object with another object
        :param other_df:
        :return: True if all matches, False if march is not complete
        """
        return MetadataConsolidator.compare_df(self.df_data, other_df)


    def compare_df_metainfo(self, other_df):
        """
        Compares the data df of this object with another object
        :param other_df:
        :return: True if all matches, False if march is not complete
        """
        return MetadataConsolidator.compare_df(self.df_metainfo, other_df)