Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
namara / lib / catalog.rb
Size: Mime:
require_relative 'rpc/catalog/catalog_twirp'
require_relative 'helpers/object_to_hash'

class Namara

  def list_all_datasets(filter = {})
     sanitized_filters =   {
        states:           Array(filter.fetch(:states, nil)),
        query:            { value: filter.fetch(:query, nil) },
        dataset_ids:      Array(filter.fetch(:dataset_ids, nil)),
        sort:             sort(filter),
        organization_ids: Array(filter.fetch(:organization_ids, nil)),
        uploader_ids:     Array(filter.fetch(:uploader_ids, nil)),
        exclude_versions: filter.fetch(:exclude_versions, nil),
        reference_states: Array(filter.fetch(:reference_states, nil)),
        warehouses:       Array(filter.fetch(:warehouses, nil)),
        timestamp_range:  filter.fetch(:timestamp_range, nil)
    }

     filter.fetch(:offset, nil)? sanitized_filters.merge!(offset: {value: filter[:offset]}): sanitized_filters
     filter.fetch(:limit, nil)? sanitized_filters.merge!(limit: {value: filter[:limit]}): sanitized_filters
    payload = {
        filter: sanitized_filters
    }

    response = rpc_request(catalog_client) do |client|
      client.list_datasets(payload, rpc_headers)
    end

    OpenStruct.new(datasets: ObjectToHash.to_hash_array(response.data.datasets.to_a), total_count: response.data.total_count)
  end

  def list_organization_datasets(organization_id, filter = {})
    sanitized_filters =   {
        query:         { value: filter.fetch(:query, nil) },
        topic_ids:     Array(filter.fetch(:topic_ids, nil)),
        dataset_ids:   Array(filter.fetch(:dataset_ids, nil)),
        sort:          sort(filter),
        states:        Array(filter.fetch(:states, nil)),
        reference_ids: Array(filter.fetch(:reference_ids, nil)),
        source_ids:    Array(filter.fetch(:source_ids, nil)),
        uploader_ids:  Array(filter.fetch(:uploader_ids, nil)),
        exclude_versions: filter.fetch(:exclude_versions, nil),
        reference_states: Array(filter.fetch(:reference_states, nil)),
        warehouses:       Array(filter.fetch(:warehouses, nil)),
        timestamp_range:  filter.fetch(:timestamp_range, nil)
    }

    filter.fetch(:offset, nil)? sanitized_filters.merge!(offset: {value: filter[:offset]}): sanitized_filters
    filter.fetch(:limit, nil)? sanitized_filters.merge!(limit: {value: filter[:limit]}): sanitized_filters

    payload = {
        filter: sanitized_filters,
        organization_id: organization_id
    }

    response = rpc_request(catalog_client) do |client|
      client.list_organization_datasets(payload, rpc_headers)
    end

    OpenStruct.new(datasets: ObjectToHash.to_hash_array(response.data.datasets.to_a), total_count: response.data.total_count)
  end

  def list_group_datasets(group_id, filter = {})
    sanitized_filters =   {
        states:             Array(filter.fetch(:states, nil)),
        query:              { value: filter.fetch(:query, nil) },
        dataset_ids:        Array(filter.fetch(:dataset_ids, nil)),
        reference_ids:      Array(filter.fetch(:reference_ids, nil)),
        source_ids:         Array(filter.fetch(:source_ids, nil)),
        topic_ids:          Array(filter.fetch(:topic_ids, nil)),
        sort:               sort(filter),
        uploader_ids:       Array(filter.fetch(:uploader_ids, nil)),
        reference_states:   Array(filter.fetch(:reference_states, nil)),
        warehouses:         Array(filter.fetch(:warehouses, nil)),
        timestamp_range:    filter.fetch(:timestamp_range, nil)
    }

    filter.fetch(:offset, nil)? sanitized_filters.merge!(offset: {value: filter[:offset]}): sanitized_filters
    filter.fetch(:limit, nil)? sanitized_filters.merge!(limit: {value: filter[:limit]}): sanitized_filters
    payload = {
        filter: sanitized_filters,
        group_id: group_id
    }

    response = rpc_request(catalog_client) do |client|
      client.list_group_datasets(payload, rpc_headers)
    end

    OpenStruct.new(datasets: ObjectToHash.to_hash_array(response.data.datasets.to_a), total_count: response.data.total_count)
  end

  def get_dataset(dataset_id)
    payload = { id: dataset_id }

    response = rpc_request(catalog_client) do |client|
      client.get_dataset(payload, rpc_headers)
    end

    OpenStruct.new(response.data.dataset.to_h)
  end

  def update_dataset(dataset)
    response = rpc_request(catalog_client) do |client|
      client.update_dataset(serialize_dataset(dataset), rpc_headers)
    end

    OpenStruct.new(response.data.dataset.to_h)
  end

  def update_properties(dataset, version_id, properties = [])
    if properties.is_a? Array
      properties_to_update = serialize_properties(dataset, version_id, properties)
      payload              = {
          dataset_id: dataset[:id],
          version_id: version_id,
          properties: properties_to_update
      }
    else
      raise RequestError.new("properties should be of array type")
    end

    response = rpc_request(catalog_client) do |client|
      client.update_properties(payload, rpc_headers)
    end

    ObjectToHash.to_hash_array(response.data.properties.to_a)
  end

  def tag_revision(revision_id, tag)
      payload              = {
          revision_id: revision_id,
          tag: tag
      }

    response = rpc_request(catalog_client) do |client|
      client.tag_revision(payload, rpc_headers)
    end

      OpenStruct.new(response.data.revision.to_h)
  end

  def untag_revision(tag_id)
    payload = { tag_id: tag_id }

    response = rpc_request(catalog_client) do |client|
      client.untag_revision(payload, rpc_headers)
    end

    OpenStruct.new(response.data.revision.to_h)
  end

  def list_tags(dataset_id)
    payload = { dataset_id: dataset_id }

    response = rpc_request(catalog_client) do |client|
      client.list_tags(payload, rpc_headers)
    end

    ObjectToHash.to_hash_array(response.data.tags.to_a)
  end

  def prune_version(version_id)
    payload = { version_id: version_id }

    response = rpc_request(catalog_client) do |client|
      client.prune_version(payload, rpc_headers)
    end

    OpenStruct.new(response.data.version.to_h)
  end

  def prune_revision(revision_id)
    payload = { revision_id: revision_id }

    response = rpc_request(catalog_client) do |client|
      client.prune_revision(payload, rpc_headers)
    end

    OpenStruct.new(response.data.revision.to_h)
  end


  private

  def serialize_dataset(dataset)
    { dataset: dataset.to_h }
  end

  def sort(filter)
    if filter[:sort_field]
      {  value: filter[:sort_field], order: filter.fetch(:sort_order, "ASC") }
    else
      nil
    end
  end

  def serialize_properties(dataset, version_id, props)
    version_index = dataset.versions.index(dataset.versions.find { |version| version[:id] == version_id })

    if version_index
      updated_properties = []
      dataset.versions[version_index][:properties].each do |property|
        props.each do |prop|
          if property[:id] == prop[:id]
            property[:title]       = prop.fetch(:title, property[:title])
            property[:key]         = prop.fetch(:key, property[:key])
            property[:description] = prop.fetch(:description, property[:description])
            property[:type]        = prop.fetch(:type, property[:type])
            property[:meta]        = prop.fetch(:meta, property[:meta])
            property[:position]    = prop.fetch(:position, property[:position])
          end
        end
        updated_properties.push(property)
      end
    else
      raise RequestError.new("#{version_id}: doesn't exist")
    end

    updated_properties
  end

  def catalog_client
    @catalog_client ||= Catalog::CatalogServiceClient.new(api_url)
  end
end