Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
dshipparser / exportinfo.py
Size: Mime:
import os.path
from abc import ABC, abstractmethod
from pathlib import Path

import xmltodict

from dshipparser import helpers


class DshipExportInfo(ABC):

    @abstractmethod
    def get_info(self):
        pass


    @abstractmethod
    def get_errorvalue_numeric(self):
        pass

    @abstractmethod
    def get_errorvalue_alphanumeric(self):
        pass

    @abstractmethod
    def get_nan_values(self):
        pass

    @abstractmethod
    def get_columns(self):
        """Returns list of column names found in metainfo"""
        pass

    @abstractmethod
    def get_precision(self, column):
        """Returns the precision (no. of places after decimal separator) for a field/column"""
        pass

    @abstractmethod
    def get_field_width(self, column):
        """Returns the total width (total no. of places including decimal separator) for a field/column"""
        pass

    def is_numeric(self, column):
        """Returns True if a field/column is considerd to be numeric"""
        prc = self.get_precision(column)
        return prc and prc > -1

    def is_alpha_numeric(self, column):
        """Returns True if a field/column is considerd to be alpha-numeric"""
        return not self.is_numeric(column)

    def get_numeric_columns(self):
        """Returns list of columns considered to be numeric"""
        return [col for col in self.get_columns() if self.is_numeric(col)]

    def get_alpha_numeric_columns(self):
        """Returns list of columns considered to be alpha-numeric"""
        return [col for col in self.get_columns() if self.is_alpha_numeric(col)]

class Dship2ExportInfo(DshipExportInfo):

    def get_precision(self, column):
        return self.export_info['precisions'].get(column)

    def get_field_width(self, column):
        return self.export_info['field_widths'].get(column)

    def get_errorvalue_numeric(self):
        return self.export_info['errorvalue_numeric']

    def get_errorvalue_alphanumeric(self):
        return self.export_info['errorvalue_alphanumeric']

    def get_nan_values(self):
        return self.export_info['nan_values']

    def get_columns(self):
        return self.export_info['columns']

    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.txt_file = helpers.find_txt_file(base_dir)

        self.export_info = self.get_info()

    def get_info(self):
        """Extracts dict with useful info (column names!)from export sys file"""
        ret = {'file_format': {}}
        doc = helpers.read_lines_from_file(self.txt_file)

        columns = []
        nan_values = {}
        precisions = {}
        field_widths = {}

        sensor, precision, length = None, None, None
        errorval_alpha, errorval_numeric = None, None

        for line in doc:

            if line.startswith('Channel '):
                # first line of channel info found, clear channel specific info!
                sensor, precision, length = None, None, None


            elif line.strip().startswith('Sensor'):
                sensor = line.split('=')[1].strip().replace('"', '')
                columns.append(sensor)
                precisions[sensor] = {}
                field_widths[sensor] = {}

            elif line.strip().startswith('Length'):
                length = int(line.split('=')[1].strip().replace('"', ''))
                field_widths[sensor] = length

            elif line.strip().startswith('Precision'):
                try:
                    precision = int(line.split('=')[1].strip().replace('"', ''))
                except ValueError: # couldn't parse to int
                    precision = None
                precisions[sensor] = precision

            elif line.startswith('Pattern for non valid numerical values'):
                errorval_numeric = line.split('=')[1].strip().replace('"', '')
                ret['errorvalue_numeric'] = errorval_numeric

            elif line.startswith('Pattern for non valid alphanumerical values'):
                errorval_alpha = line.split('=')[1].strip().replace('"', '')
                ret['errorvalue_alphanumeric'] = errorval_alpha

            elif line.startswith('Destination file'):
                base_path = os.path.abspath(os.path.dirname(self.txt_file))
                dat_file = line.split('=')[1].strip().split(' ')[0]
                ret['dat_file'] = f"{base_path}{os.path.sep}{dat_file}.dat"

            elif line.startswith('Separator'):
                ret['file_format']['@separator'] = line.split('=')[1].strip().replace('"', '')
            elif line.startswith('End of record'):
                ret['file_format']['@endofrecordmarker'] = line.split('=')[1].strip().replace('"', '')
            elif line.startswith('Decimal symbol'):
                ret['file_format']['@decimalsymbol'] = line.split('=')[1].strip().replace('"', '')
            elif line.startswith('Date/Time'):
                ret['file_format']['@datetimeformat'] = line.split('=')[1].strip().replace('"', '')

            # once sensor, precision and field lenght was parsed for this channel we can calculate error value
            if sensor and precision and length:

                assert errorval_numeric, 'numeric error value nt parsed yet! Malformated export info .txt?'
                assert errorval_alpha, 'alphanumeric error value nt parsed yet! Malformated export info .txt?'

                if precision and precision >0:
                    errorval = errorval_numeric
                else: # alphanumric type assumed if no precision given
                    errorval = errorval_alpha

                nan_values[sensor] = ['#', '"*"', f"{errorval * (length - precision - 1)}.{errorval * precision}"]


        ret['columns'] = columns
        ret['nan_values'] = nan_values
        ret['precisions'] = precisions
        ret['field_widths'] = field_widths

        # dship2 doesn't include platform in Metadata. Try to determine platformID by file name
        pf_id = helpers.guess_platform_id(ret['dat_file'])
        if not pf_id:
            raise ValueError(f'Could not determine platform for file {self.txt_file}')
        ret['platform_id'] = pf_id

        return ret


class Dship3ExportInfo(DshipExportInfo):

    def get_precision(self, column):
        return self.export_info['precisions'].get(column)

    def get_field_width(self, column):
        return self.export_info['field_widths'].get(column)

    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.sys_file = helpers.find_sys_file(base_dir)

        self.xml_file = None
        self.xml_dict = None
        self.export_info = self.get_info()

    def get_info(self):
        """Extracts dict with useful info (column names!)from export sys file"""
        ret = {}
        doc = helpers.read_lines_from_file(self.sys_file)

        for line in doc:
            if 'platformId:' in line:
                ret['platform_id'] = line.split('platformId:')[1].strip()
            if 'Order file:' in line:
                self.xml_file = line.split('Order file:')[1].strip()
                self.xml_file = os.path.join(os.path.dirname(self.sys_file), self.xml_file)
                self.xml_file = Dship3ExportInfo._fix_xml_file_path(self.xml_file)
                ret.update(self.parse_dship3_xml())

        return ret

    @staticmethod
    def _fix_xml_file_path(xml_file):
        """Temporary (?) fix for bug introduced during dship update:

           xml file is named 'oder.xml' instead of 'order_XXXX.xml'
           as stated in sys-file.

           :param xml_file: xml-file name as parsed from sys file
           :returns xml_file as parsed or first 'order*.xml'
        """
        if not Path(xml_file).exists():
            try:
                return str(list(Path(xml_file).parent.glob('order*.xml'))[0])
            except IndexError:
                import logging
                logging.error(f'No oder*.xml file found in {xml_file.parent}!')
                return None

        else:  # passed xml-file exists
            return xml_file



    def parse_dship3_xml(self):
        """Extracts dict with useful info from export xml file"""
        ret = {}

        doc = self._get_xml_dict()

        base_path = os.path.abspath(os.path.dirname(self.xml_file))
        ret['dat_file'] = f"{base_path}{os.path.sep}{doc['export']['order']['output']['#text']}.dat"
        file_format = doc['export']['description']['fileformat']
        ret['file_format'] = file_format
        ret['columns'] = [ch['#text'] for ch in doc['export']['description']['channellist']['channel']]

        ret['precisions'] = {ch['#text']: int(ch['@precision']) for ch in
                             doc['export']['description']['channellist']['channel'] if '@precision' in ch.keys()}
        
        # new dship insightics formatmay not iclude fieldwidth attribute, replace w/ 0 as default
        ret['field_widths'] = {ch['#text']: int(ch.get('@fieldwidth') or 0) for ch in
                             doc['export']['description']['channellist']['channel']}

        return ret

    def _get_precisions(self, xml_dict):
        """Returns dict with columns:precision for each columns"""


    def _get_xml_dict(self):
        """Returns dictionary parsed from xml using xmltodict"""
        if not self.xml_dict:

            with open(self.xml_file) as fd:
                self.xml_dict = xmltodict.parse(fd.read())

        return self.xml_dict

    def get_errorvalue_numeric(self):
        """Parses the 'errorvaluenumeric' from dict generated from order xml"""
        return self._get_xml_dict()['export']['description']['channellist']['@errorvaluenumeric']

    def get_errorvalue_alphanumeric(self):
        """Parses the 'errorvaluenumeric' from dict generated from order xml"""
        return self._get_xml_dict()['export']['description']['channellist']['@errorvaluealphanumeric']


    def get_nan_values(self):
        """Parses values set as NaN for each column (channel) from order xml.
        Evaluates field width and precision for each channel seperately. Returns dict with col/ channel as key,
        expected NaN string as value"""

        ret = {}

        errorval = self.get_errorvalue_numeric()
        errorval_alphanumeric = self.get_errorvalue_alphanumeric()

        for channel in self._get_xml_dict()['export']['description']['channellist']['channel']:
            channel_name = channel['#text']

            # add '#' and '"*"' as defaults see issue #3
            ret[channel_name] = ['#', '"*"']

            # new dship insightics formatmay not iclude fieldwidth attribute, replace w/ 0 as default
            if '@fieldwidth' in channel.keys():
                fw = int(channel['@fieldwidth'])

                if '@precision' in channel.keys():   # numeric channel
                    pr = int(channel['@precision'])

                    if pr == 0:
                        ret[channel_name].append(errorval*10)
                    else:
                        ret[channel_name].append(f"{errorval*(fw-pr-1)}.{errorval*pr}")

                else:  # no precission, alphanumeric channel
                    ret[channel_name].append( errorval_alphanumeric*fw )

        return ret

    def get_columns(self):
        return self.export_info['columns']