Repository URL to install this package:
|
Version:
0.1.0 ▾
|
PyPxTools
/
test_pypxtools.py
|
|---|
# -*- coding: utf-8 -*-
import base64
import inspect
import os
import pypxtools.metadata_helper
import pypxtools.pxobjects
import pypxtools.pypxtools as pxt
import pandas as pd
bts = bytes('iMX2,TrVa2P;5%', 'utf-8')
T_USER = r'pxwebserviceuser'
T_PWD = base64.b64encode(bts)
T_PRJ = r'067558e05e7f3116015e9e9e037436d0'
T_NEW_FOLDER_NAME=r'TEST_REMOVE_THIS_FOLDER_e21cf5d748485be9fe2583e9be874ddf97aa9600'
def get_test_output_directory():
"""returns location of designated direcory for test outputs."""
current_file = inspect.getfile(inspect.currentframe())
current_dir = os.path.dirname(current_file)
out_dir = os.path.join(os.path.abspath(current_dir), 'out')
if not os.path.exists(out_dir):
os.mkdir(out_dir)
return out_dir
TEST_OUT_DIR = get_test_output_directory()
class TestPxTools:
def __init__(self):
self.query = pypxtools.pxobjects.PxQuery()
self.query.u_name = T_USER
self.query.pwd = T_PWD
def setup_module(self):
self.query = pypxtools.pxobjects.PxQuery()
self.query.u_name = T_USER
self.query.pwd = T_PWD
def teardown_module(self):
self._delete_new_testfolder()
def _delete_new_testfolder(self):
testfolder = self.query.search_folder(T_NEW_FOLDER_NAME)
if testfolder:
self.query.delete_project_folder(testfolder[0].uuid)
def test_trouble_filename(self):
"""
Files with umlaut in name can cause trouble when generating file lists
"""
ctd_uuid = '067558e041c6816001425be2ef3201ac'
ctd = self.query.get_content(ctd_uuid=ctd_uuid, pxprj=self.query.get_project(T_PRJ))
exp = pxt.ListExport(self.query)
loc = exp.get_content_original_file_location(ctd)
print(loc)
assert loc is not None
assert loc.replace("'", "").endswith(ctd.file_name)
def test_prj_list(self):
"""
Test generation of project list
:return:
"""
prjs = self.query.get_project_list()
assert len(prjs>2)
def test_get_prj_uuid(self):
"""
Test search for uuid by project name
:return:
"""
test_prj = self.query.get_project(T_PRJ)
p_id = self.query.get_prj_uuid(test_prj.name)
assert p_id == test_prj.uuid
def get_cont_count(self, folder):
"""
Helper funct gets content count for a folder from server
:param folder:
:return:
"""
params = {'folderId': folder.uuid, 'projectId': T_PRJ}
return self.query.query(pypxtools.pxobjects.PxQuery.S_URL_CONTENT_COUNT, params).json()
def test_pxquery_connection(self):
assert self.query.verify_access()
def test_pxproject(self):
test_prj = self.query.get_project(T_PRJ)
assert test_prj.name == 'Testprojekt'
def test_folders(self):
test_prj = self.query.get_project(T_PRJ)
assert 3 <= len(test_prj.get_all_folders()) <= 8
def test_folder_level(self):
test_prj = self.query.get_project(T_PRJ)
for folder in test_prj.get_all_folders_ordered():
print('Level ' + folder.name + ': ' + str(folder.get_level()) )
def test_child_folders(self):
test_prj = self.query.get_project(T_PRJ)
for folder in test_prj.get_all_folders(return_ids=False):
print('Children for Folder '+folder.name+': '+str(folder.get_child_folders()) )
def test_content(self):
test_prj = self.query.get_project(T_PRJ)
for folder in test_prj.get_all_folders(return_ids=False):
folder.get_content()
print(test_prj.get_content_list())
def test_get_content_folder(self):
test_prj = self.query.get_project(T_PRJ)
for folder in test_prj.get_all_folders(return_ids=False):
for ctd in folder.get_content():
folder = ctd.get_folder()
assert folder is not None
assert folder is not 'root'
def test_get_content(self):
"""
Check if getting project content in multiple chunks gives same result as single query for content
:return:
"""
test_prj = self.query.get_project(T_PRJ)
# get contend under prj root in single query
single_query = self.query.get_prj_content(test_prj, None)
# use ids instead of pxcontent obj
single_query = [ctd.uuid for ctd in single_query]
for chunk_size in [2, 3, 10, 100, 123]:
# get content under prj root using multiple queries
chunked_query = self.query.get_content_chunky(T_PRJ, None, chunk_size)
# use ids instead of pxcontent obj
chunked_query = [ctd.uuid for ctd in chunked_query]
# assert result is the same as single query
assert set(chunked_query) == set(single_query)
for folder_id in test_prj.get_all_folders(return_ids=True):
# get contend under folder in single query
single_query = self.query.get_prj_content(test_prj, folder_id)
# use ids instead of pxcontent obj
single_query = [ctd.uuid for ctd in single_query]
for chunk_size in [1, 3, 10, 100, 123]:
# get content under folder using multiple queries
chunked_query = self.query.get_content_chunky(T_PRJ, folder_id, chunk_size)
# use ids instead of pxcontent obj
chunked_query = [ctd.uuid for ctd in chunked_query]
# assert result is the same as single query
assert set(chunked_query) == set(single_query)
def test_check_content_filename(self):
"""
TEsts query of file names in proxsys
:return:
"""
# existing file name
file_name = "20130616_WaveGliderTest-044.mov"
assert self.query.check_content_filename(file_name)
assert not self.query.is_content_filename_unique(file_name)
# unknown file name
file_name = "UnKnoWnFilEnaMe-__-__-_--_-_-_---__---__ölöl"
assert self.query.check_content_filename(file_name)
assert self.query.is_content_filename_unique(file_name)
def test_content_limited(self):
"""
Test querie server for content, limiting result to a slice of the folder/ prj content
:return:
"""
test_prj = self.query.get_project(T_PRJ)
# call w/ uuid
prj_ctd_ltd = [ctd.uuid for ctd in self.query.get_prj_content(test_prj.uuid, 'root', limit=(0,10))]
# check against call w/ object
prj_ctd_ltd_2 = [ctd.uuid for ctd in self.query.get_prj_content(test_prj, 'root', limit=(0,10))]
assert set(prj_ctd_ltd) == set(prj_ctd_ltd_2)
assert len(prj_ctd_ltd)==10
# same query w/o limit and call w/ prj object
prj_ctd = self.query.get_prj_content(test_prj, 'root')
assert len(prj_ctd_ltd)<len(prj_ctd)
for folder in test_prj.get_all_folders(return_ids=False):
assert len(self.query.get_prj_content(test_prj, folder.uuid, limit=(0, 5))) <= 5
def test_content_count(self):
test_prj = self.query.get_project(T_PRJ)
for folder in test_prj.get_all_folders(return_ids=False):
folder_ctd = folder.get_content(include_subdirs=False)
folder_ctd_cnt = folder.get_content_count(include_subdirs=False)
assert(len(folder_ctd) == folder_ctd_cnt)
folder_ctd = folder.get_content(include_subdirs=True)
folder_ctd_cnt = folder.get_content_count(include_subdirs=True)
assert (len(folder_ctd) == folder_ctd_cnt)
def test_content_size_sum(self):
print('Sum of of content filesize per folder')
test_prj = self.query.get_project(T_PRJ)
for folder in test_prj.get_all_folders(return_ids=False):
print(folder.name + ": " + str(folder.get_sum_filesize()))
def test_post_metadata_value(self):
test_prj = self.query.get_project(T_PRJ)
content = self.query.get_content(test_prj, u'067558e05edc6d06015ee79973350025')
md_d = content.get_all_metadata()
# Note to self:
# which metadata fields are returned by query is dependant on the rights configuration of the
# person issuing the query, i.e. testuser might get different set of metadata for content than e.g. cfaber.
# The test below failed for metadata field 'en_title' although similar query worked perfrctly well in jupyter NB.
# It took me a good while to figure out that testuser doesn't have access to 'en_title' for this content.
md_old_val = content.get_metadata_value('[EXIF] Artist') # get metadata value for Author/Creator using name
md_new_val = 'TEST1234'
# set new value and test
response = content.set_metadata_value(metadata_description='[EXIF] Artist', metadata_value=md_new_val)
assert response.ok
# metadata values are cached on content level --> get value from query directly
md_current_val = None
for md in self.query.get_metadata4content(content):
if md.description_name == '[EXIF] Artist':
md_current_val = md.value
assert md_current_val == md_new_val
assert content.get_metadata_value('[EXIF] Artist') == md_new_val
# revert to old value
response = content.set_metadata_value(metadata_description='[EXIF] Artist', metadata_value=md_old_val)
assert response.ok
# metadata values are cached on content level --> get value from query directly
md_current_val = None
for md in self.query.get_metadata4content(content):
if md.description_name == '[EXIF] Artist':
md_current_val = md.value
assert md_current_val == md_old_val
def test_all_metadata_descriptions(self):
"""
Test generation of List w/ all metadata fields known in ProxSys
:return:
"""
# test generation of data frame directly from json
self.query.get_df_metadata_description()
# test generation of PxMetadata objects
all_md_d = self.query.get_all_metadata_descriptions()
assert len(all_md_d) > 100
for md_d in self.query.get_all_metadata_descriptions():
print(md_d)
def test_content_media_data(self):
"""Tests query of mediadata (getMediaData API call)"""
test_prj = self.query.get_project(T_PRJ)
for ctd in test_prj.get_content_list():
if ctd.is_video:
media_data = ctd.get_media_data()
assert media_data is not None
assert 'contentId' in media_data.keys()
assert 'startTimecode' in media_data.keys()
def test_metadata_description(self):
"""
test if certain default metadata fields exist and if they can be queried
:return:
"""
md_d = self.query.get_metadata_description('067558e04914c5cb0149a37b11900bc2')
assert md_d.name == '[EXIF] Exif Image Height'
def test_content_metadata(self):
"""
Test query of content metadata
:return:
"""
test_prj = self.query.get_project(T_PRJ)
content = test_prj.get_content_list()[0]
for md_d, md in content.get_all_metadata().items():
assert content.get_metadata(md_d) is md
assert content.get_metadata_value(md_d) is md.value
def test_content_folder_path(self):
test_prj = self.query.get_project(T_PRJ)
content = test_prj.get_content_list()[0]
path = content.folder_path
assert path is not None
assert 'files' in path
def test_content_file_path(self):
test_prj = self.query.get_project(T_PRJ)
content = test_prj.get_content_list()[0]
# real file name will contain version.
# By default, path for current version of ctd is returned
path = content.get_file_path()
assert path is not None
assert 'files' in path
assert 'v.' in path
assert content.file_name in path
# check with version that is definitely differnt from current version
path = content.get_file_path(ctd_version='7.2')
assert path is not None
assert 'files' in path
assert '7.2' in path
assert content.file_name in path
def test_content_preview_size(self):
"""
Test of query for preview size of a content item
:return:
"""
test_prj = self.query.get_project(T_PRJ)
c_uuid = '067558e0463e0c5e01465dc1b5a7014c' # content from testproject, video w/ preview
expected_size = 952895
p_size = self.query.get_content_preview_size(c_uuid)
assert p_size is not None
assert p_size == expected_size
# test using pxcontent-methods
ctd = self.query.get_content(pxprj=test_prj, ctd_uuid=c_uuid)
assert ctd.get_has_preview()
assert ctd.get_preview_size() == expected_size
# No videos w/o preview to test at the moment
# c_uuid = '067558e0523bafcc0152a2c212662053' # content from testproject, image w/o preview
# expected_size = 0
# p_size = self.query.get_content_preview_size(c_uuid)
# assert p_size is not None
# assert p_size == 0
# # test using pxcontent-methods
# ctd = self.query.get_content(pxprj=test_prj, ctd_uuid=c_uuid)
# assert not ctd.get_has_preview()
# assert ctd.get_preview_size() == expected_size
def test_move_content(self):
"""
Test of moving content around in a project
:return:
"""
test_prj = self.query.get_project(T_PRJ)
ctd_id = "067558e0463e0c5e01465d393a8e0143"
src_folder = self.query.get_folder_id_for_content(content_id=ctd_id, project_id=T_PRJ)
# move to project root
response = self.query.move_content_within_project(content_id=ctd_id, project_id=T_PRJ, target_folder_id='root')
assert(response == 204)
d = {k:v for k, v in []}
assert(self.query.get_folder_id_for_content(content_id=ctd_id, project_id=T_PRJ) == 'root')
# move content back where it came from
response = self.query.move_content_within_project(content_id=ctd_id, project_id=T_PRJ,
target_folder_id=src_folder)
assert (response == 204)
assert (self.query.get_folder_id_for_content(content_id=ctd_id, project_id=T_PRJ) == src_folder)
def test_content_metadata_value(self):
"""
Test query of content metadata
:return:
"""
test_prj = self.query.get_project(T_PRJ)
content = self.query.get_content(test_prj, u'067558e0523bafcc01525fbf9a383598')
md_d = content.get_all_metadata()
# Note to self:
# which metadata fields are returned by query is dependant on the rights configuration of the
# person issuing the query, i.e. testuser might get different set of metadata for content than e.g. cfaber.
# The test below failed for metadata field 'en_title' although similar query worked perfrctly well in jupyter NB.
# It took me a good while to figure out that testuser doesn't have access to 'en_title' for this content.
md_val_name = content.get_metadata_value('[EXIF] Artist') # get metadata value for Author/Creator using name
md_val_uuid = content.get_metadata_value(u'067558e050858add0151a01a58dd0829') # get metadata value for Author/Creator using uuid
assert(md_val_name is not None)
assert (md_val_uuid is not None)
assert(md_val_name == md_val_uuid)
def test_export_simple(self):
test_prj = self.query.get_project(T_PRJ)
exp = pxt.ListExport(self.query)
exp.export_content_summary(test_prj)
def test_export_staging(self):
test_prj = self.query.get_project(T_PRJ)
exp = pxt.ListExport(self.query)
exp.print_content_staging(test_prj)
# test output to file
def test_unicode(self):
"""
Tests if unicode decode works
:return:
"""
import requests
S_URL_CONTENT_BY_UUID = pypxtools.pxobjects.PxQuery.PXURL + pypxtools.pxobjects.PxQuery.PXJSONAPI + "/content/getContent"
params = {'contentId': 'F570709886F5D7C601821D3C612FD24E'}
response = self.query.query(S_URL_CONTENT_BY_UUID, parameters=params)
if response.status_code == requests.codes.ok:
c_json = response.json()
print(c_json['fileId'])
def test_write_staging_list(self):
test_prj = self.query.get_project(T_PRJ)
# setup output options
file_name = 'tst.txt'
path = os.path.abspath(os.path.join(TEST_OUT_DIR, file_name))
# delete file if it exists
if os.path.isfile(path):
os.remove(path)
with open(path, 'w') as f:
export = pxt.ListExport(self.query, f)
export.print_content_staging(test_prj)
# check that file was written
assert os.path.isfile(path)
assert os.path.getsize(path)>0
def test_metadata_consolidate(self):
"""
test of metadata consolidation helper
:return:
"""
test_prj = self.query.get_project(T_PRJ)
cl = test_prj.get_content(include_subdirs=False)
uuid_copyright_target = u'067558e050858add0151a0146bc60828' # [EXIF] Copyright
uuid_copyricght_srcs = [u'067558e050858add0151a0146bc60828', # [EXIF] Copyright
u'FD9CCB470A0101A801AEB78F2CD60740', # Copyright
u'1DAA8BE90A0101A801BC77E4DFB0E6D9', # Copyright Notice
#u'1DA985930A0101A801CE812613C5EA32', # Copyright Status
u'067557463ac65e70013bae3901b4017b', # de_copyright
u'067557463ac65e70013bae38d8af017a'] # en_copyright
mc = pypxtools.metadata_helper.MetadataConsolidator(self.query,
target_field_id=uuid_copyright_target,
source_field_ids=uuid_copyricght_srcs,
content_list=cl)
assert(mc.df_data is not None)
assert(mc.df_metainfo is not None)
for ctd in mc.data_provider.get_content_uuids():
assert(mc._get_md_options(ctd) is not None)
f_path = os.path.join(TEST_OUT_DIR, 'test_out.xlsx')
mc.to_excel(f_path)
df_x = pd.read_excel(f_path, sheetname='data', encoding='utf-8').T
assert(mc.compare_df_data(df_x)==True)
df_x = pd.read_excel(f_path, sheetname='meta_info', encoding='utf-8')
assert(mc.compare_df_metainfo(df_x)==True)
# try creation of mc by file
mc_x = pypxtools.metadata_helper.MetadataConsolidator(self.query, filename=f_path)
assert(mc.compare_df_data(mc_x.df_data))
assert(mc.compare_df_metainfo(mc_x.df_metainfo))
for ctd in mc_x.data_provider.get_content_uuids():
assert (mc_x._get_md_options(ctd) is not None)
def test_projects_for_content(self):
"""Test listing projects a content is associated to"""
ctd_id = '067558e05608147f01563104959b026b'
prjcts = self.query.get_projects_for_content(ctd_id)
assert prjcts is not None
names = [prj.name for prj in prjcts]
assert "Testprojekt" in names
assert "Data Management" in names
def test_copy_ctd(self):
"""Test copying content from one project to another"""
self.query.delete_content('067558e040872069014131ab8adc01ba', T_PRJ)
self.query.copy_content('067558e040872069014131ab8adc01ba', T_PRJ)
prjs = [prj.name for prj in self.query.get_projects_for_content('067558e040872069014131ab8adc01ba')]
assert "Testprojekt" in prjs
def test_delete_ctd(self):
"""Test deleting content from one project"""
self.query.delete_content('067558e040872069014131ab8adc01ba', T_PRJ)
prjs = [prj.name for prj in self.query.get_projects_for_content('067558e040872069014131ab8adc01ba')]
assert "Testprojekt" not in prjs
self.query.copy_content('067558e040872069014131ab8adc01ba', T_PRJ)
prjs = [prj.name for prj in self.query.get_projects_for_content('067558e040872069014131ab8adc01ba')]
assert "Testprojekt" in prjs
self.query.delete_content('067558e040872069014131ab8adc01ba', T_PRJ)
prjs = [prj.name for prj in self.query.get_projects_for_content('067558e040872069014131ab8adc01ba')]
assert "Testprojekt" not in prjs
def test_move_ctd(self):
"""Test moving content from one project to another"""
ctd_id = '067558e04087206901408747ef370002'
src_prj_id = T_PRJ
src_fldr_id = '067558e05edc6d06015ee73f8b97000f' # testfolder
trgt_prj_id = '067558e05da8060f015dbc06bde20075' # Data Management
self.query.move_content_between_projects(ctd_id, src_prj_id, trgt_prj_id)
pjrs4ctd = [prj.uuid for prj in self.query.get_projects_for_content(ctd_id)]
assert src_prj_id not in pjrs4ctd
assert trgt_prj_id in pjrs4ctd
# now move ctd back to original prj
self.query.move_content_between_projects(ctd_id, trgt_prj_id, src_prj_id, src_fldr_id)
pjrs4ctd = [prj.uuid for prj in self.query.get_projects_for_content(ctd_id)]
assert trgt_prj_id not in pjrs4ctd
assert src_prj_id in pjrs4ctd
assert src_fldr_id == self.query.get_folder_id_for_content(ctd_id, src_prj_id)
def test_create_delete_project_folder(self):
"""Test creation and deletion of single folder"""
# just to make sure no leftovers from failed tests exist
self._delete_new_testfolder()
prj_id = T_PRJ
new_folder_id = self.query.create_project_folder(prj_id, T_NEW_FOLDER_NAME)
assert T_NEW_FOLDER_NAME in [f.name for f in self.query.get_prj_folders(prj_id)]
self.query.delete_project_folder(new_folder_id)
assert T_NEW_FOLDER_NAME not in [f.name for f in self.query.get_prj_folders(prj_id)]
def test_search_folder(self):
found_names = [f.name for f in self.query.search_folder('testfolder')]
in_prjs = [f.prj_id for f in self.query.search_folder('testfolder')]
assert 'testfolder' in found_names
assert T_PRJ in in_prjs
#def test_trouble_prj(self):
# '''
# Test project 'KM_VIDEO_RAW' which [caused problems](https://git.geomar.de/dm/PyPxTools/issues/1).
# :return:
# '''
# trbl_prj = self.query.get_project('067558e03fbec922013fc32e4db80023')
# # setup output options
# export = pxt.ListExport(self.query)
# export.print_content_staging(trbl_prj)