Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
PyPxTools / test_pypxtools.py
Size: Mime:
# -*- coding: utf-8 -*-
import base64
import inspect
import os

import pypxtools.metadata_helper
import pypxtools.pxobjects
import pypxtools.pypxtools as pxt
import pandas as pd

bts = bytes('iMX2,TrVa2P;5%', 'utf-8')
T_USER = r'pxwebserviceuser'
T_PWD = base64.b64encode(bts)
T_PRJ = r'067558e05e7f3116015e9e9e037436d0'
T_NEW_FOLDER_NAME=r'TEST_REMOVE_THIS_FOLDER_e21cf5d748485be9fe2583e9be874ddf97aa9600'



def get_test_output_directory():
    """returns location of designated direcory for test outputs."""
    current_file = inspect.getfile(inspect.currentframe())
    current_dir = os.path.dirname(current_file)
    out_dir = os.path.join(os.path.abspath(current_dir), 'out')
    if not os.path.exists(out_dir):
        os.mkdir(out_dir)
        
    return out_dir

TEST_OUT_DIR = get_test_output_directory()

class TestPxTools:

    def __init__(self):
        self.query = pypxtools.pxobjects.PxQuery()
        self.query.u_name = T_USER
        self.query.pwd = T_PWD

    def setup_module(self):
        self.query = pypxtools.pxobjects.PxQuery()
        self.query.u_name = T_USER
        self.query.pwd = T_PWD

    def teardown_module(self):
        self._delete_new_testfolder()

    def _delete_new_testfolder(self):
        testfolder = self.query.search_folder(T_NEW_FOLDER_NAME)
        if testfolder:
            self.query.delete_project_folder(testfolder[0].uuid)

    def test_trouble_filename(self):
        """
        Files with umlaut in name can cause trouble when generating file lists
        """
        ctd_uuid = '067558e041c6816001425be2ef3201ac'
        ctd = self.query.get_content(ctd_uuid=ctd_uuid, pxprj=self.query.get_project(T_PRJ))
        exp = pxt.ListExport(self.query)
        loc = exp.get_content_original_file_location(ctd)
        print(loc)
        assert loc is not None
        assert loc.replace("'", "").endswith(ctd.file_name)


    def test_prj_list(self):
        """
        Test generation of project list
        :return:
        """
        prjs = self.query.get_project_list()
        assert len(prjs>2)

    def test_get_prj_uuid(self):
        """
        Test search for uuid by project name
        :return:
        """
        test_prj = self.query.get_project(T_PRJ)
        p_id = self.query.get_prj_uuid(test_prj.name)
        assert p_id == test_prj.uuid

    def get_cont_count(self, folder):
        """
        Helper funct gets content count for a folder from server
        :param folder:
        :return:
        """
        params = {'folderId': folder.uuid, 'projectId': T_PRJ}
        return  self.query.query(pypxtools.pxobjects.PxQuery.S_URL_CONTENT_COUNT, params).json()


    def test_pxquery_connection(self):
        assert self.query.verify_access()

    def test_pxproject(self):
        test_prj = self.query.get_project(T_PRJ)
        assert  test_prj.name == 'Testprojekt'

    def test_folders(self):
        test_prj = self.query.get_project(T_PRJ)
        assert 3 <= len(test_prj.get_all_folders()) <= 8


    def test_folder_level(self):
        test_prj = self.query.get_project(T_PRJ)
        for folder in test_prj.get_all_folders_ordered():
            print('Level ' + folder.name + ': ' + str(folder.get_level()) )

    def test_child_folders(self):
        test_prj = self.query.get_project(T_PRJ)
        for folder in test_prj.get_all_folders(return_ids=False):
            print('Children for Folder '+folder.name+': '+str(folder.get_child_folders()) )

    def test_content(self):
        test_prj = self.query.get_project(T_PRJ)
        for folder in test_prj.get_all_folders(return_ids=False):
            folder.get_content()
        print(test_prj.get_content_list())

    def test_get_content_folder(self):
        test_prj = self.query.get_project(T_PRJ)
        for folder in test_prj.get_all_folders(return_ids=False):
            for ctd in folder.get_content():
                folder = ctd.get_folder()
                assert folder is not None
                assert folder is not 'root'

    def test_get_content(self):
        """
        Check if getting project content in multiple chunks gives same result as single query for content
        :return:
        """
        test_prj = self.query.get_project(T_PRJ)
        # get contend under prj root in single query
        single_query = self.query.get_prj_content(test_prj, None)
        # use ids instead of pxcontent obj
        single_query = [ctd.uuid for ctd in single_query]
        for chunk_size in [2, 3, 10, 100, 123]:
            # get content under prj root using multiple queries
            chunked_query = self.query.get_content_chunky(T_PRJ, None, chunk_size)
            # use ids instead of pxcontent obj
            chunked_query = [ctd.uuid for ctd in chunked_query]
            # assert result is the same as single query
            assert set(chunked_query) == set(single_query)

        for folder_id in test_prj.get_all_folders(return_ids=True):
            # get contend under folder in single query
            single_query = self.query.get_prj_content(test_prj, folder_id)
            # use ids instead of pxcontent obj
            single_query = [ctd.uuid for ctd in single_query]
            for chunk_size in [1, 3, 10, 100, 123]:
                # get content under folder using multiple queries
                chunked_query = self.query.get_content_chunky(T_PRJ, folder_id, chunk_size)
                # use ids instead of pxcontent obj
                chunked_query = [ctd.uuid for ctd in chunked_query]
                # assert result is the same as single query
                assert set(chunked_query) == set(single_query)


    def test_check_content_filename(self):
        """
        TEsts query of file names in proxsys
        :return:
        """
        # existing file name
        file_name = "20130616_WaveGliderTest-044.mov"
        assert self.query.check_content_filename(file_name)
        assert not self.query.is_content_filename_unique(file_name)

        # unknown file name
        file_name = "UnKnoWnFilEnaMe-__-__-_--_-_-_---__---__ölöl"
        assert self.query.check_content_filename(file_name)
        assert self.query.is_content_filename_unique(file_name)

    def test_content_limited(self):
        """
        Test querie server for content, limiting result to a slice of the folder/ prj content
        :return:
        """
        test_prj = self.query.get_project(T_PRJ)
        # call w/ uuid
        prj_ctd_ltd = [ctd.uuid for ctd in self.query.get_prj_content(test_prj.uuid, 'root', limit=(0,10))]
        # check against call w/ object
        prj_ctd_ltd_2 = [ctd.uuid for ctd in self.query.get_prj_content(test_prj, 'root', limit=(0,10))]
        assert set(prj_ctd_ltd) == set(prj_ctd_ltd_2)

        assert len(prj_ctd_ltd)==10
        # same query w/o limit and call w/ prj object
        prj_ctd = self.query.get_prj_content(test_prj, 'root')
        assert len(prj_ctd_ltd)<len(prj_ctd)
        for folder in test_prj.get_all_folders(return_ids=False):
            assert len(self.query.get_prj_content(test_prj, folder.uuid, limit=(0, 5))) <= 5

    def test_content_count(self):
        test_prj = self.query.get_project(T_PRJ)
        for folder in test_prj.get_all_folders(return_ids=False):
            folder_ctd = folder.get_content(include_subdirs=False)
            folder_ctd_cnt = folder.get_content_count(include_subdirs=False)
            assert(len(folder_ctd) == folder_ctd_cnt)

            folder_ctd = folder.get_content(include_subdirs=True)
            folder_ctd_cnt = folder.get_content_count(include_subdirs=True)
            assert (len(folder_ctd) == folder_ctd_cnt)

    def test_content_size_sum(self):
        print('Sum of of content filesize per folder')
        test_prj = self.query.get_project(T_PRJ)
        for folder in test_prj.get_all_folders(return_ids=False):
            print(folder.name + ": " + str(folder.get_sum_filesize()))


    def test_post_metadata_value(self):
        test_prj = self.query.get_project(T_PRJ)
        content = self.query.get_content(test_prj, u'067558e05edc6d06015ee79973350025')
        md_d = content.get_all_metadata()
        # Note to self:
        # which metadata fields are returned by query is dependant on the rights configuration of the
        # person issuing the query, i.e. testuser might get different set of metadata for content than e.g. cfaber.
        # The test below failed for metadata field 'en_title' although similar query worked perfrctly well in jupyter NB.
        # It took me a good while to figure out that testuser doesn't have access to 'en_title' for this content.
        md_old_val = content.get_metadata_value('[EXIF] Artist')  # get metadata value for Author/Creator using name
        md_new_val = 'TEST1234'
        # set new value and test
        response = content.set_metadata_value(metadata_description='[EXIF] Artist', metadata_value=md_new_val)
        assert response.ok
        # metadata values are cached on content level --> get value from query directly
        md_current_val = None
        for md in self.query.get_metadata4content(content):
            if md.description_name == '[EXIF] Artist':
                md_current_val = md.value
        assert md_current_val == md_new_val

        assert  content.get_metadata_value('[EXIF] Artist') == md_new_val
        # revert to old value
        response = content.set_metadata_value(metadata_description='[EXIF] Artist', metadata_value=md_old_val)
        assert response.ok
        # metadata values are cached on content level --> get value from query directly
        md_current_val = None
        for md in self.query.get_metadata4content(content):
            if md.description_name == '[EXIF] Artist':
                md_current_val = md.value
        assert md_current_val == md_old_val

    def test_all_metadata_descriptions(self):
        """
        Test generation of List w/ all metadata fields known in ProxSys
        :return:
        """
        # test generation of data frame directly from json
        self.query.get_df_metadata_description()
        # test generation of PxMetadata objects
        all_md_d = self.query.get_all_metadata_descriptions()
        assert len(all_md_d) > 100

        for md_d in self.query.get_all_metadata_descriptions():
            print(md_d)

    def test_content_media_data(self):
        """Tests query of mediadata (getMediaData API call)"""
        test_prj = self.query.get_project(T_PRJ)
        for ctd in test_prj.get_content_list():
            if ctd.is_video:
                media_data = ctd.get_media_data()
                assert media_data is not None
                assert 'contentId' in media_data.keys()
                assert 'startTimecode' in media_data.keys()

    def test_metadata_description(self):
        """
        test if certain default metadata fields exist and if they can be queried
        :return:
        """
        md_d  = self.query.get_metadata_description('067558e04914c5cb0149a37b11900bc2')
        assert md_d.name == '[EXIF] Exif Image Height'

    def test_content_metadata(self):
        """
        Test query of content metadata
        :return:
        """
        test_prj = self.query.get_project(T_PRJ)
        content = test_prj.get_content_list()[0]
        for md_d, md in content.get_all_metadata().items():
            assert content.get_metadata(md_d) is md
            assert content.get_metadata_value(md_d) is md.value

    def test_content_folder_path(self):
        test_prj = self.query.get_project(T_PRJ)
        content = test_prj.get_content_list()[0]

        path = content.folder_path
        assert path is not None
        assert 'files' in path

    def test_content_file_path(self):
        test_prj = self.query.get_project(T_PRJ)
        content = test_prj.get_content_list()[0]

        # real file name will contain version.
        # By default, path for current version of ctd is returned
        path = content.get_file_path()
        assert path is not None
        assert 'files' in path
        assert 'v.' in path
        assert content.file_name in path

        # check with version that is definitely differnt from current version
        path = content.get_file_path(ctd_version='7.2')
        assert path is not None
        assert 'files' in path
        assert '7.2' in path
        assert content.file_name in path

    def test_content_preview_size(self):
        """
        Test of query for preview size of a content item
        :return:
        """
        test_prj = self.query.get_project(T_PRJ)

        c_uuid = '067558e0463e0c5e01465dc1b5a7014c'  # content from testproject, video w/ preview
        expected_size = 952895
        p_size = self.query.get_content_preview_size(c_uuid)
        assert p_size is not None
        assert p_size == expected_size
        # test using pxcontent-methods
        ctd = self.query.get_content(pxprj=test_prj, ctd_uuid=c_uuid)
        assert ctd.get_has_preview()
        assert ctd.get_preview_size() == expected_size

        # No videos w/o preview to test at the moment
        # c_uuid = '067558e0523bafcc0152a2c212662053'  # content from testproject, image w/o preview
        # expected_size = 0
        # p_size = self.query.get_content_preview_size(c_uuid)
        # assert p_size is not None
        # assert p_size == 0
        # # test using pxcontent-methods
        # ctd = self.query.get_content(pxprj=test_prj, ctd_uuid=c_uuid)
        # assert not ctd.get_has_preview()
        # assert ctd.get_preview_size() == expected_size


    def test_move_content(self):
        """
        Test of moving content around in a project
        :return:
        """
        test_prj = self.query.get_project(T_PRJ)
        ctd_id = "067558e0463e0c5e01465d393a8e0143"
        src_folder = self.query.get_folder_id_for_content(content_id=ctd_id, project_id=T_PRJ)
        # move to project root
        response = self.query.move_content_within_project(content_id=ctd_id, project_id=T_PRJ, target_folder_id='root')
        assert(response ==  204)
        d = {k:v for k, v in []}
        assert(self.query.get_folder_id_for_content(content_id=ctd_id, project_id=T_PRJ) == 'root')
        # move content back where it came from
        response = self.query.move_content_within_project(content_id=ctd_id, project_id=T_PRJ,
                                                          target_folder_id=src_folder)
        assert (response == 204)
        assert (self.query.get_folder_id_for_content(content_id=ctd_id, project_id=T_PRJ) == src_folder)



    def test_content_metadata_value(self):
        """
        Test query of content metadata
        :return:
        """
        test_prj = self.query.get_project(T_PRJ)
        content = self.query.get_content(test_prj, u'067558e0523bafcc01525fbf9a383598')
        md_d = content.get_all_metadata()
        # Note to self:
        # which metadata fields are returned by query is dependant on the rights configuration of the
        # person issuing the query, i.e. testuser might get different set of metadata for content than e.g. cfaber.
        # The test below failed for metadata field 'en_title' although similar query worked perfrctly well in jupyter NB.
        # It took me a good while to figure out that testuser doesn't have access to 'en_title' for this content.
        md_val_name = content.get_metadata_value('[EXIF] Artist')  # get metadata value for Author/Creator using name
        md_val_uuid = content.get_metadata_value(u'067558e050858add0151a01a58dd0829')  # get metadata value for Author/Creator using uuid

        assert(md_val_name is not None)
        assert (md_val_uuid is not None)
        assert(md_val_name == md_val_uuid)

    def test_export_simple(self):
        test_prj = self.query.get_project(T_PRJ)
        exp = pxt.ListExport(self.query)
        exp.export_content_summary(test_prj)

    def test_export_staging(self):
        test_prj = self.query.get_project(T_PRJ)
        exp = pxt.ListExport(self.query)
        exp.print_content_staging(test_prj)
        # test output to file

    def test_unicode(self):
        """
        Tests if unicode decode works
        :return:
        """
        import requests
        S_URL_CONTENT_BY_UUID = pypxtools.pxobjects.PxQuery.PXURL + pypxtools.pxobjects.PxQuery.PXJSONAPI + "/content/getContent"
        params = {'contentId': 'F570709886F5D7C601821D3C612FD24E'}
        response = self.query.query(S_URL_CONTENT_BY_UUID, parameters=params)
        if response.status_code == requests.codes.ok:
            c_json = response.json()
            print(c_json['fileId'])


    def test_write_staging_list(self):
        test_prj = self.query.get_project(T_PRJ)
        # setup output options
        file_name = 'tst.txt'
        path = os.path.abspath(os.path.join(TEST_OUT_DIR, file_name))

        # delete file if it exists
        if os.path.isfile(path):
            os.remove(path)

        with open(path, 'w') as f:
            export = pxt.ListExport(self.query, f)
            export.print_content_staging(test_prj)

        # check that file was written
        assert os.path.isfile(path)
        assert os.path.getsize(path)>0

    def test_metadata_consolidate(self):
        """
        test of metadata consolidation helper
        :return:
        """
        test_prj = self.query.get_project(T_PRJ)
        cl = test_prj.get_content(include_subdirs=False)
        uuid_copyright_target = u'067558e050858add0151a0146bc60828'  # [EXIF] Copyright
        uuid_copyricght_srcs = [u'067558e050858add0151a0146bc60828', # [EXIF] Copyright
                                u'FD9CCB470A0101A801AEB78F2CD60740',  # Copyright
                                u'1DAA8BE90A0101A801BC77E4DFB0E6D9',  # Copyright Notice
                                #u'1DA985930A0101A801CE812613C5EA32',  # Copyright Status
                                u'067557463ac65e70013bae3901b4017b',  # de_copyright
                                u'067557463ac65e70013bae38d8af017a']  # en_copyright
        mc = pypxtools.metadata_helper.MetadataConsolidator(self.query,
                                                            target_field_id=uuid_copyright_target,
                                                            source_field_ids=uuid_copyricght_srcs,
                                                            content_list=cl)

        assert(mc.df_data is not None)
        assert(mc.df_metainfo is not None)
        for ctd in mc.data_provider.get_content_uuids():
            assert(mc._get_md_options(ctd) is not None)

        f_path = os.path.join(TEST_OUT_DIR, 'test_out.xlsx')
        mc.to_excel(f_path)
        df_x = pd.read_excel(f_path, sheetname='data', encoding='utf-8').T
        assert(mc.compare_df_data(df_x)==True)
        df_x = pd.read_excel(f_path, sheetname='meta_info', encoding='utf-8')
        assert(mc.compare_df_metainfo(df_x)==True)
        # try creation of mc by file
        mc_x = pypxtools.metadata_helper.MetadataConsolidator(self.query, filename=f_path)
        assert(mc.compare_df_data(mc_x.df_data))
        assert(mc.compare_df_metainfo(mc_x.df_metainfo))
        for ctd in mc_x.data_provider.get_content_uuids():
            assert (mc_x._get_md_options(ctd) is not None)

    def test_projects_for_content(self):
        """Test listing projects a content is associated to"""
        ctd_id = '067558e05608147f01563104959b026b'
        prjcts = self.query.get_projects_for_content(ctd_id)
        assert prjcts is not None
        names = [prj.name for prj in prjcts]
        assert "Testprojekt" in names
        assert "Data Management" in names

    def test_copy_ctd(self):
        """Test copying content from one project to another"""
        self.query.delete_content('067558e040872069014131ab8adc01ba', T_PRJ)
        self.query.copy_content('067558e040872069014131ab8adc01ba', T_PRJ)
        prjs = [prj.name for prj in self.query.get_projects_for_content('067558e040872069014131ab8adc01ba')]
        assert "Testprojekt" in prjs


    def test_delete_ctd(self):
        """Test deleting content from one project"""
        self.query.delete_content('067558e040872069014131ab8adc01ba', T_PRJ)
        prjs = [prj.name for prj in self.query.get_projects_for_content('067558e040872069014131ab8adc01ba')]
        assert "Testprojekt" not in prjs
        self.query.copy_content('067558e040872069014131ab8adc01ba', T_PRJ)
        prjs = [prj.name for prj in self.query.get_projects_for_content('067558e040872069014131ab8adc01ba')]
        assert "Testprojekt" in prjs
        self.query.delete_content('067558e040872069014131ab8adc01ba', T_PRJ)
        prjs = [prj.name for prj in self.query.get_projects_for_content('067558e040872069014131ab8adc01ba')]
        assert "Testprojekt" not in prjs

    def test_move_ctd(self):
        """Test moving content from one project to another"""
        ctd_id = '067558e04087206901408747ef370002'
        src_prj_id = T_PRJ
        src_fldr_id = '067558e05edc6d06015ee73f8b97000f'  # testfolder
        trgt_prj_id = '067558e05da8060f015dbc06bde20075'  # Data Management

        self.query.move_content_between_projects(ctd_id, src_prj_id, trgt_prj_id)
        pjrs4ctd = [prj.uuid for prj in self.query.get_projects_for_content(ctd_id)]
        assert src_prj_id not in pjrs4ctd
        assert trgt_prj_id in pjrs4ctd

        # now move ctd back to original prj
        self.query.move_content_between_projects(ctd_id, trgt_prj_id, src_prj_id, src_fldr_id)
        pjrs4ctd = [prj.uuid for prj in self.query.get_projects_for_content(ctd_id)]
        assert trgt_prj_id not in pjrs4ctd
        assert src_prj_id in pjrs4ctd
        assert src_fldr_id == self.query.get_folder_id_for_content(ctd_id, src_prj_id)


    def test_create_delete_project_folder(self):
        """Test creation and deletion of single folder"""
        # just to make sure no leftovers from failed tests exist
        self._delete_new_testfolder()

        prj_id = T_PRJ
        new_folder_id = self.query.create_project_folder(prj_id, T_NEW_FOLDER_NAME)
        assert T_NEW_FOLDER_NAME in [f.name for f in self.query.get_prj_folders(prj_id)]

        self.query.delete_project_folder(new_folder_id)
        assert T_NEW_FOLDER_NAME not in [f.name for f in self.query.get_prj_folders(prj_id)]

    def test_search_folder(self):
        found_names = [f.name for f in self.query.search_folder('testfolder')]
        in_prjs = [f.prj_id for f in self.query.search_folder('testfolder')]

        assert 'testfolder' in found_names
        assert T_PRJ in in_prjs

            #def test_trouble_prj(self):
    #    '''
    #    Test project 'KM_VIDEO_RAW' which [caused problems](https://git.geomar.de/dm/PyPxTools/issues/1).
    #    :return:
    #    '''
    #    trbl_prj = self.query.get_project('067558e03fbec922013fc32e4db80023')
    #    # setup output options
    #    export = pxt.ListExport(self.query)
    #    export.print_content_staging(trbl_prj)