Repository URL to install this package:
|
Version:
2.2.0 ▾
|
require_relative 'rpc/catalog/catalog_twirp'
require_relative 'helpers/object_to_hash'
class Namara
def list_all_datasets(filter = {})
sanitized_filters = {
states: Array(filter.fetch(:states, nil)),
query: { value: filter.fetch(:query, nil) },
dataset_ids: Array(filter.fetch(:dataset_ids, nil)),
sort: sort(filter),
organization_ids: Array(filter.fetch(:organization_ids, nil)),
uploader_ids: Array(filter.fetch(:uploader_ids, nil)),
exclude_versions: filter.fetch(:exclude_versions, nil),
reference_states: Array(filter.fetch(:reference_states, nil)),
warehouses: Array(filter.fetch(:warehouses, nil)),
timestamp_range: filter.fetch(:timestamp_range, nil)
}
filter.fetch(:offset, nil)? sanitized_filters.merge!(offset: {value: filter[:offset]}): sanitized_filters
filter.fetch(:limit, nil)? sanitized_filters.merge!(limit: {value: filter[:limit]}): sanitized_filters
payload = {
filter: sanitized_filters
}
response = rpc_request(catalog_client) do |client|
client.list_datasets(payload, rpc_headers)
end
OpenStruct.new(datasets: ObjectToHash.to_hash_array(response.data.datasets.to_a), total_count: response.data.total_count)
end
def list_organization_datasets(organization_id, filter = {})
sanitized_filters = {
query: { value: filter.fetch(:query, nil) },
topic_ids: Array(filter.fetch(:topic_ids, nil)),
dataset_ids: Array(filter.fetch(:dataset_ids, nil)),
sort: sort(filter),
states: Array(filter.fetch(:states, nil)),
reference_ids: Array(filter.fetch(:reference_ids, nil)),
source_ids: Array(filter.fetch(:source_ids, nil)),
uploader_ids: Array(filter.fetch(:uploader_ids, nil)),
exclude_versions: filter.fetch(:exclude_versions, nil),
reference_states: Array(filter.fetch(:reference_states, nil)),
warehouses: Array(filter.fetch(:warehouses, nil)),
timestamp_range: filter.fetch(:timestamp_range, nil)
}
filter.fetch(:offset, nil)? sanitized_filters.merge!(offset: {value: filter[:offset]}): sanitized_filters
filter.fetch(:limit, nil)? sanitized_filters.merge!(limit: {value: filter[:limit]}): sanitized_filters
payload = {
filter: sanitized_filters,
organization_id: organization_id
}
response = rpc_request(catalog_client) do |client|
client.list_organization_datasets(payload, rpc_headers)
end
OpenStruct.new(datasets: ObjectToHash.to_hash_array(response.data.datasets.to_a), total_count: response.data.total_count)
end
def list_group_datasets(group_id, filter = {})
sanitized_filters = {
states: Array(filter.fetch(:states, nil)),
query: { value: filter.fetch(:query, nil) },
dataset_ids: Array(filter.fetch(:dataset_ids, nil)),
reference_ids: Array(filter.fetch(:reference_ids, nil)),
source_ids: Array(filter.fetch(:source_ids, nil)),
topic_ids: Array(filter.fetch(:topic_ids, nil)),
sort: sort(filter),
uploader_ids: Array(filter.fetch(:uploader_ids, nil)),
reference_states: Array(filter.fetch(:reference_states, nil)),
warehouses: Array(filter.fetch(:warehouses, nil)),
timestamp_range: filter.fetch(:timestamp_range, nil)
}
filter.fetch(:offset, nil)? sanitized_filters.merge!(offset: {value: filter[:offset]}): sanitized_filters
filter.fetch(:limit, nil)? sanitized_filters.merge!(limit: {value: filter[:limit]}): sanitized_filters
payload = {
filter: sanitized_filters,
group_id: group_id
}
response = rpc_request(catalog_client) do |client|
client.list_group_datasets(payload, rpc_headers)
end
OpenStruct.new(datasets: ObjectToHash.to_hash_array(response.data.datasets.to_a), total_count: response.data.total_count)
end
def get_dataset(dataset_id)
payload = { id: dataset_id }
response = rpc_request(catalog_client) do |client|
client.get_dataset(payload, rpc_headers)
end
OpenStruct.new(response.data.dataset.to_h)
end
def update_dataset(dataset)
response = rpc_request(catalog_client) do |client|
client.update_dataset(serialize_dataset(dataset), rpc_headers)
end
OpenStruct.new(response.data.dataset.to_h)
end
def update_properties(dataset, version_id, properties = [])
if properties.is_a? Array
properties_to_update = serialize_properties(dataset, version_id, properties)
payload = {
dataset_id: dataset[:id],
version_id: version_id,
properties: properties_to_update
}
else
raise RequestError.new("properties should be of array type")
end
response = rpc_request(catalog_client) do |client|
client.update_properties(payload, rpc_headers)
end
ObjectToHash.to_hash_array(response.data.properties.to_a)
end
def tag_revision(revision_id, tag)
payload = {
revision_id: revision_id,
tag: tag
}
response = rpc_request(catalog_client) do |client|
client.tag_revision(payload, rpc_headers)
end
OpenStruct.new(response.data.revision.to_h)
end
def untag_revision(tag_id)
payload = { tag_id: tag_id }
response = rpc_request(catalog_client) do |client|
client.untag_revision(payload, rpc_headers)
end
OpenStruct.new(response.data.revision.to_h)
end
def list_tags(dataset_id)
payload = { dataset_id: dataset_id }
response = rpc_request(catalog_client) do |client|
client.list_tags(payload, rpc_headers)
end
ObjectToHash.to_hash_array(response.data.tags.to_a)
end
def prune_version(version_id)
payload = { version_id: version_id }
response = rpc_request(catalog_client) do |client|
client.prune_version(payload, rpc_headers)
end
OpenStruct.new(response.data.version.to_h)
end
def prune_revision(revision_id)
payload = { revision_id: revision_id }
response = rpc_request(catalog_client) do |client|
client.prune_revision(payload, rpc_headers)
end
OpenStruct.new(response.data.revision.to_h)
end
private
def serialize_dataset(dataset)
{ dataset: dataset.to_h }
end
def sort(filter)
if filter[:sort_field]
{ value: filter[:sort_field], order: filter.fetch(:sort_order, "ASC") }
else
nil
end
end
def serialize_properties(dataset, version_id, props)
version_index = dataset.versions.index(dataset.versions.find { |version| version[:id] == version_id })
if version_index
updated_properties = []
dataset.versions[version_index][:properties].each do |property|
props.each do |prop|
if property[:id] == prop[:id]
property[:title] = prop.fetch(:title, property[:title])
property[:key] = prop.fetch(:key, property[:key])
property[:description] = prop.fetch(:description, property[:description])
property[:type] = prop.fetch(:type, property[:type])
property[:meta] = prop.fetch(:meta, property[:meta])
property[:position] = prop.fetch(:position, property[:position])
end
end
updated_properties.push(property)
end
else
raise RequestError.new("#{version_id}: doesn't exist")
end
updated_properties
end
def catalog_client
@catalog_client ||= Catalog::CatalogServiceClient.new(api_url)
end
end