Repository URL to install this package:
|
Version:
0.2.2 ▾
|
PyPxTools
/
pxobjects.py
|
|---|
import base64
import os
import getpass
from collections import OrderedDict
import pandas as pd
import requests
import collections
def flatten_list(list_of_lists):
"""
Removes list within lists, makes one flat list instead
:param list_of_lists:
:return:
"""
# implementation taken from 'Christian's answer here
# http://stackoverflow.com/questions/2158395/flatten-an-irregular-list-of-lists-in-python
basestring = (str, bytes)
for el in list_of_lists:
if isinstance(el, collections.Iterable) and not isinstance(el, basestring):
for sub in flatten_list(el):
yield sub
else:
yield el
class PxQuery(object):
'''
Class for connecting ti proxsys server
'''
# Set default values for creating a client to connect to ProxSys
PXURL = os.environ.get("PXURL") or "https://media.geomar.de:443"
PXJSONAPI = "/proxsys-json/json/7.4"
S_URL_PROJS = PXURL+PXJSONAPI+"/project/getProjects"
S_URL_SINGLE_PROJ = PXURL+PXJSONAPI+"/project/getProject"
S_URL_USR_B_NAME = PXURL+PXJSONAPI+"/user/getUserByName"
S_URL_FOLDERS = PXURL+PXJSONAPI+"/project/getProjectFolders"
S_URL_CONTENT_BY_PRJ = PXURL + PXJSONAPI + "/content/getContentByProject"
S_URL_CONTENT_BY_PRJ_LTD = PXURL + PXJSONAPI + "/content/getContentByProjectLimited"
S_URL_CONTENT_BY_PRJ_LTD_SORT = PXURL + PXJSONAPI + "/content/getContentByProjectLimitedSort"
S_URL_CONTENT_CHECK_FILENAME = PXURL + PXJSONAPI + "/content/checkFilename"
S_URL_CONTENT_COUNT = PXURL + PXJSONAPI + "/content/getContentCountByProject"
S_URL_FOLDERS_FOR_CONTENT = PXURL+PXJSONAPI+'/project/getProjectFoldersForContent'
S_URL_METADATA_ALL_DESCRIPTIONS = PXURL + PXJSONAPI + '/metadata/getAllDescriptions'
S_URL_METADATA_DESCRIPTIONS_PROJ = PXURL + PXJSONAPI +'/metadata/getDescriptionsForProject'
S_URL_METADATA = PXURL + PXJSONAPI + '/metadata/getMetadata'
S_URL_SET_METADATA = PXURL+PXJSONAPI+'/metadata/setMetadata'
S_URL_SINGLE_CONTENT = PXURL + PXJSONAPI + '/content/getContent'
S_URL_RERENDER_CONTENT = PXURL + PXJSONAPI + '/content/reRenderContent'
S_URL_CREATE_NEW_THUMBNAIL = PXURL + PXJSONAPI + '/content/createNewThumbnail'
S_URL_CONTENT_PREVIEW_SIZE = PXURL + PXJSONAPI + '/content/getPreviewSize'
S_URL_CONTENT_MEDIADATA = PXURL + PXJSONAPI + '/content/getMediaData'
S_URL_MOVE_CONTENT_IN_PRJ = PXURL + PXJSONAPI + '/project/moveContentWithinProject'
S_URL_COPY_CONTENT = PXURL + PXJSONAPI + '/project/copyContent'
S_URL_DELETE_CONTENT = PXURL + PXJSONAPI + '/project/deleteContent'
S_URL_PROJECTS_FOR_CONTENT = PXURL + PXJSONAPI + '/project/getProjectsForContent'
S_URL_CREATE_PROJECT_FOLDER = PXURL + PXJSONAPI + '/project/createProjectFolder'
S_URL_DELETE_PROJECT_FOLDER = PXURL + PXJSONAPI + '/project/deleteProjectFolder'
S_URL_SEARCH_FOLDER = PXURL + PXJSONAPI + '/project/searchFolder'
S_URL_SEARCH_CONTENT = PXURL + PXJSONAPI + '/content/searchContent'
def __init__(self):
self.u_name = None
self.pwd = None
self.prj_list = None
self.prj_cache = {}
self._metadata_descriptions = None
self._df_metadata_descriptions = None
def ask_auth(self):
'''
Collects password and username from user
'''
user = None
pwd = None
if self.u_name is None:
user = getpass.getuser()
user = input('user [{0}]:'.format(user)) or user
# store base64 encoded pwd, don't store plain pwd in memory
pwd = base64.b64encode(bytes(getpass.getpass('password:'), 'utf-8'))
return user, pwd
def get_auth_info(self, always_ask=False):
'''
returns either cached copy or queryied pwd and username
*pwd is clear text, don't store in variable*
@param always_ask: ask user no matter if cached copy is available or not
'''
if always_ask or self.u_name is None:
self.u_name, self.pwd = self.ask_auth()
# decode pwd before returning so that it can be used directly
return self.u_name, base64.b64decode(self.pwd).decode('utf-8')
def post(self, service_url, payload_dict):
"""
Posts payload to service_url
:param service_url: URL to post to
:param payload_dict: dict, will be posted as JSON
:return: response
"""
response = requests.post(service_url, json=payload_dict, auth=self.get_auth_info(), verify=False)
if response.ok:
return response
elif response.status_code == 401: # unauthorized
self.u_name = None
self.pwd = None
response.raise_for_status()
else:
response.raise_for_status()
return response
def query(self, service_url, parameters):
'''
Makes a query to the ProxSys Server. Raises exception if connection failed
:param service_url: base URL for the request
:param parameters: parameters for the query as a dict
:return: response object
'''
response = requests.get(service_url, params=parameters, auth=self.get_auth_info(), verify=False)
if response.ok:
return response
elif response.status_code == 401: # unauthorized
self.u_name = None
self.pwd = None
response.raise_for_status()
else:
response.raise_for_status()
return response
def verify_access(self):
'''
Tests if user name and pwd are allright by asking the server
to return testproject
:return: True if all is well, html response code if not
'''
self.get_auth_info() # make sure username is cached
self.get_project(r'067558e05e7f3116015e9e9e037436d0')
# if above fails, an HTTPError will be raised, test fails
return True
def post_metadata_value(self, ctd_id, meta_field_id, meta_value, timecode: 'number'=-1):
data =[self.get_metadata_json(ctd_id, meta_field_id, meta_value, timecode)]
return self.post(PxQuery.S_URL_SET_METADATA, data)
def post_bulk_metadata_edit(self, ctd_lst: ['PxContent'],
metadata_description_uuid: 'uuid',
metadata_value: 'new value',
timecode: 'number'=-1) -> 'response':
"""
Bulk edit of metadata. Writes matadata_value to all content items passed in ctd_list
:param ctd_lst:
:param metadata_description_uuid:
:param matadata_value:
:param timecode: default: -1 for non-timecode metadata, use timecode >=0 else
:return: response object
"""
data = []
for ctd in ctd_lst:
data.append(self.get_metadata_json(ctd.uuid, metadata_description_uuid, metadata_value, timecode))
return self.post(PxQuery.S_URL_SET_METADATA, data)
def post_bulk_metadata_edit_chunky(self, ctd_lst:['PxContent'],
metadata_description_uuid:'uuid',
metadata_value:'new value',
timecode:'number'=-1,
chunk_size:int=100,
progress_bar=None) -> 'response':
"""
Bulk edit of metadata. Writes matadata_value to all content items passed in ctd_list. Does posting of
metadata in chunks and conveniently updates progress bar
"""
# set up variables for chunk start index chunk end index
content_count = len(ctd_lst)
chunk_size = chunk_size
chunk_start = 0
#chunk_end = chunk_start + chunk_size # overshooting corrected in loop
if progress_bar is not None:
# setup progress bar
progress_bar.min = 0
progress_bar.max = content_count
# loop for generating chunks and issuing server query
# chunks in the form 0...100, 100...200, 200...300 are generated by range fct and chunk_end calculation below.
for i in range(chunk_start, content_count, chunk_size):
debug = False
chunk_start = i # chunk start is generated by range fkt
# prevent overshooting, max index = content_count
chunk_end = min(chunk_start + chunk_size, content_count)
if debug:
print('from {} to {}'.format(chunk_start, chunk_end))
# query server
response = self.post_bulk_metadata_edit(ctd_lst[chunk_start:chunk_end],
metadata_description_uuid,
metadata_value,
timecode)
if debug:
print(response.url)
# update progress bar
if progress_bar is not None:
progress_bar.value=chunk_end
def get_metadata_json(self, ctd_id, meta_field_id, meta_value, timecode=-1):
ret = {
"contentId": ctd_id,
"descriptionId": meta_field_id,
"metaInhalt": meta_value,
"timecode": timecode
}
return ret
def get_prj_metadata_fields(self, px_prj:'PxProject'):
"""
Returns metadata_descriptions for a single projects. Returns raw json in a dict with field name as key
:param px_prj:
:return:
"""
p_uuid = px_prj.uuid
params = {'projectId': p_uuid}
url = PxQuery.S_URL_METADATA_DESCRIPTIONS_PROJ
md_ds = {}
for md_d in self.query(url, parameters=params).json():
md_ds[md_d['descriptionName']] = md_d['descriptionId']
return OrderedDict(sorted(md_ds.items()))
def get_df_metadata_description(self):
"""
Returns dataframe w/ all MetadataDescriptions known to ProxSys
:return:
"""
if self._df_metadata_descriptions is None:
# query server
rsp = self.query(PxQuery.S_URL_METADATA_ALL_DESCRIPTIONS, parameters={})
# create data frame from response
df = pd.read_json(rsp.text)
# save data frame as excel file
df.to_excel('./metadata_fields.xls')
# set uuid as index
df.index = df.descriptionId
# drop column, information is now set as index
df.drop('descriptionId', axis=1, inplace=True)
self._df_metadata_descriptions = df
return self._df_metadata_descriptions
def get_all_metadata_descriptions(self):
"""
Returns all medatdata fields known to the ProxSys server.
Query is cached and not updated
:return: List w/ PxMetaDescription
"""
if self._metadata_descriptions == None:
self._metadata_descriptions = {}
for response in self.query(PxQuery.S_URL_METADATA_ALL_DESCRIPTIONS, parameters={}).json():
md_d = PxMetaDescription(self, response)
self._metadata_descriptions[md_d.uuid] = md_d
return self._metadata_descriptions.values()
def get_metadata_description(self, uuid):
"""
Return PxMetaDescription with given UUID from cache
:param uuid:
:return: PxMetaDescription
"""
# check if cache exists, create if necessary
if self._metadata_descriptions is None:
self.get_all_metadata_descriptions()
# at this point, cache should exist and can be accessed
return self._metadata_descriptions.get(uuid)
def get_metadata4content(self, content):
"""
Queries metadata for content. Result is *not cached*,
caching is implemented at PxContent level
:param content: PxContent object
:return: List w/ PxMetadata
"""
ret = []
params = {'contentId' : content.uuid, 'titlesOnly':'false'}
for response in self.query(PxQuery.S_URL_METADATA, parameters=params).json():
md = PxMetadata(self, content, response)
ret.append(md)
return ret
def get_project_list(self, force_refresh=False):
"""
Queries server for list of projects. Returns tidy list
:param force_refresh: if True, gets list fresh from server, if False, cached list is used
:return:
"""
if self.prj_list is None or force_refresh:
# get list w/ json objects from server
self.prj_list = self.query(PxQuery.S_URL_PROJS, parameters={}).json()
# make dataframe from json dicts
self.prj_list = pd.DataFrame(self.prj_list)
# drop unwanted columns: include all columns in list...
to_drop = list(self.prj_list.columns.values)
# ... and remove the keepers from drop list
to_drop.remove('pname')
to_drop.remove('projectId')
to_drop.remove('preferredStorage')
# drop unwanted columns
self.prj_list.drop(to_drop, axis=1, inplace=True)
return self.prj_list
def get_prj_uuid(self, prj_name):
'''
Searches for name in project list, returns uuid for search hit
:param prj_name:
:return:
'''
df = self.get_project_list()
result = df.query("pname == '{0}'".format(prj_name))
if len(result)>0:
return result.iloc[0].projectId
return None
def get_project(self, p_uuid, try_cache=True):
'''
Gets single project from server
:param p_uuid:
:param try_cache: if True, look into cache first befor issuing query
:return: PxProject
'''
ret = self.prj_cache.get(p_uuid)
if ret is None or not try_cache:
params = {'projectId': p_uuid}
response = self.query(PxQuery.S_URL_SINGLE_PROJ, parameters=params)
ret = PxProject(self, response.json())
self.prj_cache[p_uuid] = ret
return ret
def get_content(self, pxprj, ctd_uuid):
'''
Gets single contet from server
:param pxprj: PxProject this content is associated with
:param ctd_uuid: uuid of content object
:return: PxContent
'''
params = {'contentId': ctd_uuid}
response = self.query(PxQuery.S_URL_SINGLE_CONTENT, parameters=params)
ret = PxContent(pxprj, response.json())
return ret
def get_prj_folders(self, prj_id, folder_id='root'):
"""
Wrapper for the getProjectFolders ProxSys function
:param prj_id: prj uuid
:param folder_id:
:return: list w/ PxFolder objects
"""
params = {'projectId' : prj_id, 'folderId' : folder_id}
response = self.query(PxQuery.S_URL_FOLDERS, parameters=params)
ret = []
for fldr_json in response.json():
ret.append(PxFolder(self, fldr_json))
return ret
def get_content_chunky(self, prj_id, folder_id, chunk_size=100, order='CNAME', order_dir='ASC', progress_bar=None):
"""
Queries server for content in multiple queries. Content directly under passed folder is returned, subfolders are ignored.
For folders with many content items, single query of content runs into server side timeout. To prevent this,
this method uses JSON call 'getContentByProjectLimitedSort' multiple times.
:param prj_id: UUID of project containing content
:param folder_id: UUID of folder containing wanted content. Pass 'root' or None for content under project root
:param chunk_size: Number of content items to get with a single server call. Experiment.
:param order: Server query returns content items orderd. Possible values:
CNAME, FILE_ID, LENGTH, FILE_SIZE, LASTCHANGE, CREATE_DATE, FILE_TYPE, CVERSION
:param order_direction: Direction of ordering. ASC or DESC
:param progress_bar: an IPywidget FloatProgress can be passed. If not None, progress bar is updated with
each chunk
:return:
"""
prj = self.get_project(prj_id, try_cache=True)
# no folder id passed: get ctd under prj root
if folder_id is None:
folder_id = 'root'
# set up variables for chunk start index chunk end index
content_count = self.get_content_count(prj_id, folder_id)
chunk_size = chunk_size
chunk_start = 0
chunk_end = chunk_start + chunk_size # overshooting corrected in loop
progress_bar_old_text = '' # remember text set for progress bar, set back later
if progress_bar is not None:
# setup progress bar
progress_bar.min = 0
progress_bar.max = content_count
progress_bar_old_text = progress_bar.description
# set new text
progress_bar.description = 'Getting content'
# these params don't change between server queries
params = {'projectId' : prj_id,
'folderId' : folder_id,
'order' : order,
'orderDir' : order_dir}
ret = []
# loop for generating chunks and issuing server query
# chunks in the form 0...100, 100...200, 200...300 are generated by range fct and chunk_end calculation below.
# Endpoints have to be overlapping since Proxsys JSON function does not include item with index end. Instead,
# calling with start=100, end=200 returns items with index 100:199.
for i in range(chunk_start, content_count, chunk_size):
debug = False
chunk_start = i # chunk start is generated by range fkt
# prevent overshooting, max index = content_count
chunk_end = min(chunk_start + chunk_size, content_count )
# start and end change for each server query
params['start'] = chunk_start
params['end'] = chunk_end
if debug:
print('from {} to {}'.format(chunk_start, chunk_end))
# query server
response = self.query(PxQuery.S_URL_CONTENT_BY_PRJ_LTD_SORT, params)
if debug:
print(response.url)
# create PxContent objects for this chunk and ad to returned list
for c_json in response.json():
px_ctd = PxContent(prj, c_json, folder_id=folder_id)
ret.append(px_ctd)
if debug:
print('added {} [{}]'.format(px_ctd.file_name, px_ctd.uuid))
# update progress bar
if progress_bar is not None:
progress_bar.value=chunk_end
if progress_bar is not None:
# revert description to old text
progress_bar.description = progress_bar_old_text
return ret
def get_content_count(self, prj_id, folder_id):
"""
gets content count (number of content items directly beneath) for project root or folder
:param prj_id: uuid of project
:param folder_id: uuid of folder or None if count for project root is asked
:return: number of content items
"""
if folder_id is None:
folder_id = 'root'
params = {'projectId': prj_id, 'folderId': folder_id}
response = self.query(PxQuery.S_URL_CONTENT_COUNT, parameters=params)
try:
return int(response.text)
except:
raise ValueError("Expected int value for content count, got: {}".format(response.raw))
def get_prj_content(self, prj, folder_id, limit=None):
"""
Gets content for project/ folder
Gets all content in one go. Better use get_content_chunky to avoid server timeouts.
:param prj_id: project uuid
:param folder_id: folder uuid or 'root'
:param limit: Tuple w/ (startIndex, endIndex) to limit query to a slice of content
:return: list w/ PxContent
"""
try:
# assume PxProject was passed, try to get uuid
p_id = prj.uuid
except AttributeError: # OK, this did'n work, no PxPrj passed
p_id = prj # assume id was passed directly
prj = self.get_project(p_id) # get/ generate prj object
if limit is None:
params = {'projectId': p_id, 'folderId': folder_id}
response = self.query(PxQuery.S_URL_CONTENT_BY_PRJ, parameters=params)
else:
start_idx = limit[0]
end_idx = limit[1]
params = {'projectId': p_id, 'folderId': folder_id, 'start':start_idx, 'end':end_idx}
response = self.query(PxQuery.S_URL_CONTENT_BY_PRJ_LTD, parameters=params)
ret = []
for c_json in response.json():
ret.append(PxContent(prj, c_json, folder_id=folder_id))
return ret
def get_folder_id_for_content(self, content_id, project_id):
"""
Queries folder id for given content id
:param content_id: uuid of content
:param project_id: folder id in this project
:return: first uuid of folder for content (if in more than one project and no project_id was passed)
"""
params = {'contentId': content_id}
response = self.query(PxQuery.S_URL_FOLDERS_FOR_CONTENT, parameters=params)
json_data = response.json()
if project_id:
for fldr in json_data:
if fldr['projectId'] == project_id:
return fldr['folderId']
else:
return json_data[0]['folderId']
def check_content_filename(self, file_name):
"""
/content/checkFilename function af ProxSys JSON API.
Returns string to use as filename. The returned string is unique to the proxsys system.
If file_name already exists in system, proxsys will alter file name to make it unique, e.g. by adding [1]
before file extension.
If file name is not in system yet, name will be returned as provided
:param file_name:
:return:
"""
params = {'filename': file_name}
response = self.query(PxQuery.S_URL_CONTENT_CHECK_FILENAME, parameters=params)
return response.text
def is_content_filename_unique(self, file_name):
"""
Checks if a certain file name is already in system or not
:param file_name:
:return: true if the name is unique (i.e. not yet in system), false if name already exists
"""
return file_name == self.check_content_filename(file_name)
def rerender_content(self, content_id):
"""
Query to start the rerendering of a content e.g. when it previously failed
:param content_id: uuid of content for which rerendering should be started
"""
params = {'contentId': content_id}
response = self.query(PxQuery.S_URL_RERENDER_CONTENT, parameters=params)
return response.status_code
def create_new_thumbnail_for_content(self, content_id, timecode='-1'):
"""
Query to create new thumbnail after initial creation failed
:param content_id: uuid of content for which thumbnails is to be created
:param timecode: the timecode from which to grab the thumbnail - default is -1
"""
params = {'contentId': content_id, 'timecode': timecode}
response = self.query(PxQuery.S_URL_CREATE_NEW_THUMBNAIL, parameters=params)
return response.status_code
def get_content_preview_size(self, content_id):
"""
Gets preview size for content item via ProxSys JSON API
:param content_id: uuid of content item
:return: preview file size in kb
"""
params = {'contentId': content_id}
response = self.query(PxQuery.S_URL_CONTENT_PREVIEW_SIZE, parameters=params)
try:
return int(response.text)
except ValueError:
# return 0 if error in parsing occurs
return 0
def move_content_within_project(self, content_id, project_id, target_folder_id):
"""
Moves a content item from one folder to another within a project
:param content_id: uuid of item to be moved
:param project_id: project id of project for content
:param target_folder_id: uuid of folder the content will be moved to. To move to project root folder, pass 'root'
:return: status code of JSON response
"""
params = {'contentId' : content_id, 'projectId' : project_id, 'folderId' : target_folder_id}
response = self.query(PxQuery.S_URL_MOVE_CONTENT_IN_PRJ, parameters=params)
return response.status_code
def move_content_between_projects(self, content_id, source_prj_id, target_prj_id, target_folder_id='root'):
"""Moves content from one project to another
'move' is accomplished by separate steps:
- copy ctd to target prj
- move ctd to target folder in target prj
- delete ctf from source prj
:param content_id: id of content to be moved
:param source_prj_id: id of the project the content is moved away from
:param target_prj_id: id of the project the target is moved to
:param target_folder_id: id of the folder in target project the content will land in
"""
ret = self.copy_content(content_id, target_prj_id, target_folder_id)
prjs4ctd = [prj.uuid for prj in self.get_projects_for_content(content_id)]
# to prevent lost ctd, delete only if
# - content is in src prj
# - content is in target prj
if source_prj_id in prjs4ctd and target_prj_id in prjs4ctd:
self.delete_content(content_id, source_prj_id)
return ret # return status code of copy operation
def copy_content(self, content_id, project_id, target_folder_id='root'):
"""Copies (adds) content to a project"""
params = {'contentId': content_id, 'projectId': project_id}
response = self.query(PxQuery.S_URL_COPY_CONTENT, parameters=params)
#COPY_CONTENT adds ctd to root of project, now content needs to be moved to destination folder
if response.ok:
return self.move_content_within_project(content_id, project_id, target_folder_id)
return response.status_code
def delete_content(self, content_id, project_id, protected_delete=True):
"""
Removes content from project **handle with care**:
content is **permanently deleted** if not associated with other project
:param protected_delete: prevent permanent deletion, delete is not performed if content is only in one project
"""
if len(self.get_projects_for_content(content_id)) < 2:
if protected_delete:
return 423 # Locked
params = {'contentId' : content_id, 'projectId' : project_id}
response = self.query(PxQuery.S_URL_DELETE_CONTENT, parameters=params)
return response.status_code
def get_projects_for_content(self, content_id):
"""gets projects the content is associated to"""
params = {'contentId' : content_id}
response = self.query(PxQuery.S_URL_PROJECTS_FOR_CONTENT, parameters=params)
return [PxProject(self, json_data) for json_data in response.json()]
def delete_project_folder(self, folder_id):
"""Deletes subfolder inside a project"""
params = {'folderId': folder_id}
response = self.query(PxQuery.S_URL_DELETE_PROJECT_FOLDER, params)
return response
def create_project_folder(self, project_id, folder_name, parent_folder_id='root', comment='created via PyPxTools', state=0):
"""Creates a folder in a project"""
params = {'projectId': project_id,
'parentFolderId': parent_folder_id,
'folderName': folder_name,
'comment': comment,
'state': state}
response = self.query(PxQuery.S_URL_CREATE_PROJECT_FOLDER, params)
# if all went well, response.text holds id for newly created folder
return response.text
def search_folder(self, search_string):
"""Search for project folder"""
params = {'searchString': search_string}
response = self.query(PxQuery.S_URL_SEARCH_FOLDER, params)
ret = []
for json_data in response.json():
folder_json = json_data.get('folder')
if folder_json:
folder = PxFolder(self, folder_json)
ret.append(folder)
return ret
def search_content(self, search_string):
"""Search for project folder"""
params = {'searchString': search_string}
response = self.query(PxQuery.S_URL_SEARCH_CONTENT, params)
ret = []
for json_data in response.json():
ctd_json = json_data.get('content')
if ctd_json:
ctd = PxContent(self, ctd_json)
ret.append(ctd)
return ret
class PxProject(object):
'''
Class represents Project in ProxSys
'''
KEY_NAME = 'pname'
KEY_UUID = 'projectId'
def __init__(self, query, json_data):
self.query = query
self.json = json_data
self._folders = {}
self._get_folders()
# self._folder_for_content = None
self._content = None
self._content_count = None
def __str__(self):
return self.__repr__()
def __repr__(self):
return 'Project '+self.name+' : '+self.uuid
@property
def name(self):
return self.json[PxProject.KEY_NAME]
@property
def uuid(self):
return self.json[PxProject.KEY_UUID]
def get_folder(self, f_id):
"""
Returns PxFolder for uuid
:param f_id: uuid of folder
:return: PxFolder
"""
ret = self._folders.get(f_id)
if ret is None:
print('sth happened, check me out pls')
#TODO error handling or server search??
return None
return ret
def _get_child_folders(self, parent_folder):
'''
Non-recursive call gets all child folders for a folder in prj
:param parent_folder:
:return:
'''
# run query
return self.query.get_prj_folders(self.uuid, parent_folder)
def _get_folders(self, parent_folder='root'):
'''
Recursively compiles list w/ all folders in this projekt.
Subfolders are included.
Results are stored in dictionary, server is only queried if needed. This cache is not updated
during the object's lifetime
:param parent_folder:
:return: dictionary w/ folders by folderId
'''
try:
for folder in self._get_child_folders(parent_folder):
self._folders[folder.uuid] = folder
self._get_folders(folder.uuid)
except requests.HTTPError:
print('no sub folders for '+parent_folder)
return self._folders
def get_folder_dict(self):
"""
Returns dict w/ all folders in this project, uuid as key, PxFolder as value
:return:
"""
return self._folders
def get_all_folders(self, return_ids=False):
"""
returns list w/ all folders in this prj. either as PxFolder objects or as Folder uuid.
Operates on cached dict, no communication w/ server
:param return_ids if True, return list w/ uuid's, if False return PxFolder objects
:return:
"""
if return_ids:
return self._folders.keys()
else:
return self._folders.values()
def get_root_folders(self, return_ids=False):
"""
Returns all folders directly under the project root
:param return_ids: if True, returns uuids instead of objects
:return: List of PxFolder objects or list of folder uuids
"""
ret = []
for folder in self.get_all_folders(return_ids=False):
if folder.get_parent_folder() is not None:
continue # finish this loop iteration, continue w/ next
# we have root folder
if return_ids:
ret.append(folder.uuid)
else:
ret.append(folder)
ret.sort(key=lambda f: str(f).upper()) # sort alphabetically
return ret
def get_level(self):
"""
Implemented to complement folder.get_level. Returns alway 0
:return:
"""
return 0
def get_child_folders(self, return_ids=False):
"""
Alias for get_root_folders(). Made so both PxFolder and PxProject have same access to child folders
Returns folders directly under the project root
:param return_ids: if True, returns uuids instead of objects
:return:
"""
return self.get_root_folders(return_ids)
def get_all_folders_ordered(self, flat_list=True):
"""
Compiles list w/ all folders, ordered so that child folders are directly under parents
:param flat_list: if True, returns flat list, if False returns lists in list
:return: List w/ PxFolders or list w/ folder uuid
"""
ret = []
for rf in self.get_root_folders():
ret.append(rf)
child_folders = rf.get_all_child_folders()
if len(child_folders)>0:
ret.append(child_folders)
if flat_list:
ret = flatten_list(ret)
return ret
def _get_direct_content(self):
"""
Returns list w/ content directly underneath the project
:return:
"""
if self._content == None:
self._content = self.query.get_content_chunky(prj_id=self.uuid, folder_id='root')
return self._content
# def _get_all_content_with_folder(self):
# """
# Gets list with all content in all subfolders. Queries folder for content.
# Caches Results. Cache is *not refreshed* during object lifetime.
# :return: dict w/ content as key, folder as value
# """
# if self._folder_for_content is None:
# # The ProxSys API returns only content directly underneath folders/ project by default
# # using self.query.get_prj_content. Here, for convenience sake and to have all content stored
# # in one go, collect all of the content, including under the project itself and under
# # all folders/ subfolders.
# # To be able to reference later which parent the contents is under, store content
# # in dict w/ content obj as key, folder/ prj as value
#
# self._folder_for_content = {} # cache
#
# # get content under project itself
# for content in self._get_direct_content():
# self._folder_for_content[content] = self
#
# # get content under folders and subfolders
# threads = []
# for folder in self.get_all_folders():
# t = threading.Thread(name=folder, target=self._add_folder_content, args=(folder, ))
# threads.append(t)
# t.start()
# #self._add_folder_content(folder)
#
# for t in threads:
# t.join()
#
# return self._folder_for_content
#
# def _add_folder_content(self, folder):
# for content in self.query.get_prj_content(prj=self, folder_id=folder.uuid):
# self._folder_for_content[content] = folder
def get_content_count(self, include_subdirs=True):
"""
gets number of content items directly under the projct root or in whole projekt including subdirrectories
:param include_subdirs:
:return:
"""
if self._content_count is None:
self._content_count = self.query.get_content_count(self.uuid, None)
if not include_subdirs:
return self._content_count
else: # include subdirs
ret = self._content_count
for folder in self.get_child_folders():
ret = ret + folder.get_content_count(include_subdirs=True)
return ret
def get_content_list(self, include_subdirs=True):
"""
Gets list w/ all PxContent for this project
:param include_subdirs: set if only content directly under prj is returned or if
content in subfolders is included
:return:
"""
if include_subdirs:
# get direct content 1st
ret = self.get_content_list(include_subdirs=False)
# then add content of folders
for folder in self.get_child_folders():
ret.extend(folder.get_content(include_subdirs=True))
return ret
else:
return self._get_direct_content()
def get_content(self, include_subdirs=False):
"""
Alias for get_content_list(self, include_subdirs=True)
:param include_subdirs:
:return:
"""
return self.get_content_list(include_subdirs)
def get_folder_for_content(self, content):
"""
gets folder that is direct parent of content
:param content:
:return:
"""
return self._get_all_content_with_folder()[content]
class PxFolder(object):
"""
Class represents ProxSys project folder
"""
KEY_UUID = 'folderId'
KEY_NAME = 'folderName'
KEY_PRJ_ID = 'projectId'
KEY_PARENT_ID = 'parentId'
def __init__(self, query, json_data):
self.query = query
self.json = json_data
self._content = None
self._content_count = None
self._px_prj = None
self._child_folders = None
def __str__(self):
return self.__repr__()
def __repr__(self):
return 'Folder '+self.name+' : '+self.uuid
def get_project(self):
"""
Returns PxProject for this Folder
:return:
"""
if self._px_prj is None:
self._px_prj = self.query.get_project(self.prj_id)
return self._px_prj
@property
def uuid(self):
return self.json[PxFolder.KEY_UUID]
@property
def name(self):
return self.json[PxFolder.KEY_NAME]
@property
def prj_id(self):
return self.json[PxFolder.KEY_PRJ_ID]
@property
def parent_id(self):
return self.json[PxFolder.KEY_PARENT_ID]
def get_content(self, include_subdirs=False):
"""
Returns content for this folder
:param include_subdirs: if TRue, gets content from sub directories too (recursive)
if False, gets only content directly underneath this folder
:return:
"""
if include_subdirs:
ret = self.get_content(include_subdirs=False)
for child in self.get_child_folders():
ret.extend(child.get_content(include_subdirs=True))
return ret
else: # include_subdirs == False
if self._content is None:
self._content = {}
project = self.get_project()
for ctd in self.query.get_content_chunky(prj_id=project.uuid, folder_id=self.uuid):
self._content[ctd.uuid] = ctd
return list(self._content.values())
def get_level(self):
"""
Returns level of this folder in project structure:
If folder is directly underneath project, level = 1. For each
subdirectory, level is increased
:return: Level of this folder
"""
parent = self.get_parent_folder()
if parent is None:
return 1 # directly under project
else:
return 1+parent.get_level()
def get_parent_folder(self):
"""
Gets PxFolder directly above this one
:return: Folder above, or None if this folder is directly under project
"""
if self.parent_id == 'root':
return None
return self.get_project().get_folder_dict()[self.parent_id]
def get_parent_items(self):
parent = self.get_parent_folder()
if parent is None:
return [self.get_project()]
else:
lst = parent.get_parent_items()
lst.append(parent)
return lst
def get_child_folders(self):
"""
Gets list w/ all folders directly underneath this one
:return:
"""
if self._child_folders is None: # first time call, init cache
self._child_folders = []
# for simplicity's sake, just look into all folders in projekt
# folders are cached, no request to server involved here
for folder in self.get_project().get_all_folders(return_ids=False):
if folder.parent_id == self.uuid:
self._child_folders.append(folder)
return self._child_folders
def get_all_child_folders(self):
"""
gets list w/ all child folders, including subfolders
:return:
"""
ret = []
for child in sorted(self.get_child_folders(), key=lambda f: str(f).upper()): # sort alphabetically:
ret.append(child)
child_children = child.get_all_child_folders()
if len(child_children)>0 or True:
ret.append(child_children)
return ret
def get_content_count(self, include_subdirs=True):
"""
Returns count of content items in this folder
:param include_subdirs: if True, also count items in subfolders
:return: number of content items for this folder
"""
if self._content_count is None:
self._content_count = self.query.get_content_count(self.prj_id, self.uuid)
if not include_subdirs:
return self._content_count
else: # include subdirs
ret = self._content_count
for folder in self.get_child_folders():
ret = ret + folder.get_content_count(include_subdirs=True)
return ret
def get_sum_filesize(self, recursive=True):
"""
Returns cummulative file size of content items in this folder
:param recursive: if True, also count items in subfolders
:return: number of content items for this folder
"""
i = 0
for ctd in self.get_content():
i = ctd.file_size
if not recursive:
return i
else:
for child in self.get_child_folders():
i = i + child.get_sum_filesize()
return i/(1024*1024) # return MB
class PxContent(object):
"""
Class representing ProxSys content data
"""
KEY_NAME = 'cname'
KEY_UUID = 'contentId'
KEY_VERSION = 'cversion'
KEY_FILE = 'fileId'
KEY_F_SIZE = 'fileSize'
KEY_PREVIEW_READY = 'previewReady'
KEY_FILETYPE = 'fileType'
KEY_FILE_PATH = 'path'
def __init__(self, prj, json_data, query=None, folder_id=None):
"""
Creates new PxContent object
:param prj: The project the content is associated with, as PxProject object.
In ProxSys, content can be associated w/ a number of projects. This is passed to make association clear.
Can be None, but then query object has to be passed. If not None, the projects query obj will be used
:param json_data: JSON data as returned by query
:param query: PxQuery object. Only evaluated if no PxProject was passed.
:param folder_id: Id for folder containing content
"""
self.prj = prj
self.json = json_data
self.folder_id = folder_id
if self.prj is not None:
self.query = self.prj.query
else:
self.query = query
if self.query is None:
raise AttributeError(' PxQuery object not passed! Pass PxQuery directly if no PxProject was passed.')
self._metadata = None
def __str__(self):
return u'Content: {0} [{1}]'.format(self.name, self.uuid)
@property
def name(self):
return self.json[PxContent.KEY_NAME]
@property
def preview_ready(self):
return self.json[PxContent.KEY_PREVIEW_READY]
@property
def uuid(self):
return self.json[PxContent.KEY_UUID]
@property
def version(self):
return self.json[PxContent.KEY_VERSION]
@property
def file_name(self):
return self.json[PxContent.KEY_FILE]
@property
def file_size(self):
return self.json[PxContent.KEY_F_SIZE]
@property
def file_type(self):
return self.json[PxContent.KEY_FILETYPE]
@property
def folder_path(self):
return self.json[PxContent.KEY_FILE_PATH]
@property
def is_video(self):
return self.file_type.lower().startswith('video')
def get_file_path(self, ctd_version=None):
"""Returns path of file in file system. Specify version or get path for current version"""
if not ctd_version:
ctd_version = self.version
return "{}v.{}.{}".format(self.folder_path, ctd_version, self.file_name)
def get_parent_items(self):
"""
Returns ordered list of all parent items up to the project root. Project is always first item of list,
the parent folder is last
:return:
"""
fldr = self.get_folder()
if fldr:
if fldr == 'root':
return [self.prj]
else:
lst = fldr.get_parent_items()
lst.append(fldr)
return lst
else: # self.get_folder() is None:
raise ValueError('No folder set! Args. Please fix me.')
# folder was not passed on __init__
# if this is really an issue, fix it in get_folder() by querying server for folder
def get_folder(self):
"""
Returns folder containing content.
If content is under root folder, contains 'root'. If folder could not be determined (not passed in __init__),
returns none. In this case, query project's folders manually or use query.get_folder_for_content
:return:
"""
if self.folder_id is None:
return None
if self.folder_id == 'root':
return 'root'
# else: folder id was set
if self.prj:
return self.prj.get_folder(self.folder_id)
# else: project is not set :/ See if this is an issues in real life, fix if needed
return None
def get_all_metadata(self):
"""
returns map w/ all metadata for this content. Query is cached, cache is not updated
:return: map w/ PxMetaDescription as key, PxMetadata as value
"""
if self._metadata is None:
self._metadata = {}
for md in self.query.get_metadata4content(self):
self._metadata[md.description.uuid] = md
return self._metadata
def get_metadata(self, metadata_description):
"""
gets single PxMetadata object
:param metadata_description: metadata_description: uuid of PxMetaDescription, PxMetaDescription object or name of PxMetaDescription
:return:
"""
md_d_id = self._assure_metadata_description(metadata_description) # make sure to get uuid or none
if md_d_id is not None:
return self.get_all_metadata().get(md_d_id) # return obj or none if this PxMetaDescription not prssent for ctd
else: # no PxMetaDescription found for a name
return None
def get_metadata_value(self, metadata_description):
"""
gets value for single PxMetadata object
:param metadata_description: uuid of PxMetaDescription, PxMetaDescription object or name of PxMetaDescription
:return:
"""
# make sure that this also works if a text was passed instead of an PxMetaDescription
md_d = self._assure_metadata_description(metadata_description)
ret = self.get_metadata(md_d)
if ret is not None: # check, if metadata_description is unknown for this content, None is returned
return ret.value
else:
return None
def set_metadata_value(self, metadata_description, metadata_value, timecode:'number'=-1):
"""Sets single metadata value
:param metadata_description: uuid of PxMetaDescription, PxMetaDescription object or name of PxMetaDescription
:param metadata_value: new value of metadata field
:param timecode: default: -1 for non-timecode metadata, use timecode >=0 else
"""
# make sure that this also works if a text was passed instead of an PxMetaDescription
md_d = self._assure_metadata_description(metadata_description)
return self.query.post_metadata_value(ctd_id=self.uuid, meta_field_id=md_d, meta_value=metadata_value, timecode=timecode)
def _assure_metadata_description(self, md_d):
"""
Tests if passed argument is a PxMetaDescription. If not, assumes that name of PxMetaDescription was
passed and attempts to find suitable object for name
:param md_d: PxMetaDescription object or uuid of PxMetaDescription or name of PxMetaDescription as text
:return: uuid of PxMetaDescription or None
"""
# trivial case. can happen because of nested call of this method
if md_d is None:
return None
# best case, a uuid was passed. we don'nt need to do anything
if md_d in self.get_all_metadata().keys():
return md_d
# still here? Assume that PxMetaDescription object was passed
try:
md_d.input_style # if this works ...
return md_d.uuid # ... a PxMetaDescription object was passed
except AttributeError: # ok, no PxMetaDescription passed
# search for PxMetaDescription by name
ret = PxMetaDescription.search_by_name(self.query, md_d)
if ret is not None: # found PxMetaDescription matching name
return ret.uuid
else: # no PxMetaDescription found for name
return None
def get_media_data(self):
params = {'contentId': self.uuid}
response = self.query.query(PxQuery.S_URL_CONTENT_MEDIADATA, params)
if response.ok:
return response.json()
else:
return None
def rerender(self):
# Trigger rerendering of content e.g. when previews are missing
return self.query.rerender_content(self.uuid)
def create_new_thumbnail(self, timecode='0'):
# Check whether content is a anything but video, then set timecode to '-1'
if "video" not in self.file_type:
timecode = '-1'
return self.query.create_new_thumbnail_for_content(self.uuid,timecode)
def get_preview_size(self):
"""
Queries server for size of preview. If no preview is found, returns 0
Query is *not cached*
:return: size of preview file in kb as int
"""
return self.query.get_content_preview_size(self.uuid)
def get_has_preview(self):
"""
Queries server to decide if content has a preview or not using preview size.
Query is *not cached*
:return: True if preview was found, Flase otherwise
"""
return self.get_preview_size() > 0
class PxMetaDescription(object):
"""
Reorientation of ProxsysMetadescription data model
The MetaDescription is an abstract concept, a description of a metadata field. it is implemented
in the ProxsysMetadata data model, which describes an actual metadata field with value attached to a content.
From the ProxSys api docs:
ProxsysMetadescription {
descriptionId (string, optional):
Unique ID of description field. ,
descriptionName (string, optional):
Display name of metadata field. ,
inputStyle (integer, optional):
0 = normal mode 1 = predefined values 2 = mixed mode 3 = boolean (Format: "true" or "false"
4 = timestamp (Format: "yyyy-MM-dd HH:mm:ss.S") 5 = date (Format: "yyyy-MM-dd") 6 = time (Format: "HH:mm:ss.S")
7 = integer (Format: "123") 8 = double (Format: "123.4" or "123,4") 9 = reserved
10 = formatted text 11 = link 12 = hierachical 13 = GPS. ,
pos (integer, optional):
Global sort position. ,
predefineValues (string, optional):
Predefined values for selectboxes separated by #, input style must be 1 or 2. ,
sysField (boolean, optional):
0 = user defined field 1 = system defined field, cannot be deleted. ,
timecodeField (integer, optional):
0 = does not depend on timecode.
}
"""
INPUT_STYLE_NORMAL = 0
INPUT_STYLE_PREDEFINED = 1
INPUT_STYLE_MIXED = 2
INPUT_STYLE_BOOLEAN = 3
INPUT_STYLE_TIMESTAMP = 4
INPUT_STYLE_DATE = 5
INPUT_STYLE_TIME = 6
INPUT_STYLE_INT = 7
INPUT_STYLE_DOUBLE = 8
INPUT_STYLE_RESERVED = 9
INPUT_STYLE_FORMATTED_TEXT = 10
INPUT_STYLE_LINK = 11
INPUT_STYLE_HIERARCHICAL = 12
INPUT_STYLE_GPS = 13
def __init__(self, query, json_data):
self.query = query
self.json_data = json_data
# compile dict linking input style codes to input style names
self.input_style_names4codes = {
0 : PxMetaDescription.INPUT_STYLE_NORMAL,
1 : PxMetaDescription.INPUT_STYLE_PREDEFINED,
2 : PxMetaDescription.INPUT_STYLE_MIXED,
3 : PxMetaDescription.INPUT_STYLE_BOOLEAN,
4 : PxMetaDescription.INPUT_STYLE_TIMESTAMP,
5 : PxMetaDescription.INPUT_STYLE_DATE,
6 : PxMetaDescription.INPUT_STYLE_TIME,
7 : PxMetaDescription.INPUT_STYLE_INT,
8 : PxMetaDescription.INPUT_STYLE_DOUBLE,
9 : PxMetaDescription.INPUT_STYLE_RESERVED,
10 : PxMetaDescription.INPUT_STYLE_FORMATTED_TEXT,
11 : PxMetaDescription.INPUT_STYLE_LINK,
12 : PxMetaDescription.INPUT_STYLE_HIERARCHICAL,
13 : PxMetaDescription.INPUT_STYLE_GPS,
}
def __repr__(self):
return 'MetadataDescription: {0} [{1}]'.format(self.name, self.uuid)
@property
def uuid(self):
return self.json_data['descriptionId']
@property
def name(self):
return self.json_data['descriptionName']
@property
def input_style(self):
return self.json_data['inputStyle']
@property
def predefineValues(self):
return self.json_data['predefineValues']
@staticmethod
def search_by_name(query, name):
"""
Searches a metadata description matching* the given name from all known metadata descriptions.
*matching is a little fuzzy: case insensitive, only beginning of MD has to match passed name
:param query: pxquery to get metadatadescriptions from Px
:param name: name of me
:return: PxMetaDescription object or None
"""
to_find = name.strip().upper()
for md in query.get_all_metadata_descriptions():
if md.name.strip().upper().startswith(to_find):
return md
return None
class PxMetadata(object):
"""
Representation of ProxsysMetadata data model
From ProxSys API:
ProxsysMetadata {
contentId (string, optional):
Unique ID of Content for this metadata entry. ,
descriptionId (string, optional):
Unique ID of description field. ,
metaInhalt (string, optional):
Metadata value. ,
timecode (number, optional):
Timecode for this metadata entry, -1 for default metadata. ,
descriptionName (string, optional):
Name of Metadata field.
}
"""
def __init__(self, query, content, json_data):
self.query = query
self.content = content
self.json_data = json_data
if not self.content.uuid == self.json_data['contentId']:
raise ValueError('UUID Mismatch for metadata content!')
self.description = self.query.get_metadata_description(self.json_data['descriptionId'])
def __repr__(self):
return 'Metadata for {0}: {1}={2}'.format(self.content, self.description_name, self.value)
@property
def value(self):
ret = self.json_data['metaInhalt']
if ret is None:
return None
return ret
@property
def description_name(self):
return self.json_data['descriptionName']
@property
def description_id(self):
return self.json_data['descriptionId']