Repository URL to install this package:
|
Version:
0.3.6 ▾
|
# frozen_string_literal: true
module Datasync
# Contains a sync operation, including status information
class SyncOperation # rubocop:disable Metrics/ClassLength
attr_reader :source_ids, :target_ids, :link_states, :errors, :mapper,
:dropped_updates
def initialize(mapper)
@mapper = mapper
@source_key = mapper.source_key
@target_key = mapper.target_key
@errors = []
@dropped_updates = 0
end
def count_dropped_update
@dropped_updates += 1
end
def get_source(id)
@source_data[id]
end
def get_target(id)
@target_data[id]
end
def target_dataset
@target_ds
end
def target_row(id)
@target_ds.where(@target_key => id)
end
def totals
if @link_states
h = @link_states.transform_values(&:count)
h[:total] = h.values.sum
else
h = {}
end
h[:error] = @errors.count
h
end
def load_data
@source_ds = @mapper.source_proc.call
@target_ds = @mapper.target_proc.call
@source_data = @source_ds.as_hash(@source_key)
@target_data = @target_ds.as_hash(@target_key)
@source_ids = Set.new @source_data.keys
@target_ids = Set.new @target_data.keys
end
def classify_links
classifier = LinkClassifier.new(@source_ids, @target_ids, links)
configure_classifier(classifier)
@link_states = classifier.result
end
def unload_data
@source_data = @target_data = nil
end
def do_match
match_map.each do |_values, data|
if data[:source_id].length == 1 && data[:target_id].length == 1
process_match(data[:source_id].first, data[:target_id].first)
end
end
end
def check_limits!
totals = self.totals
@mapper.limits.each do |situation, maximum|
count = totals[situation] || 0
raise LimitError, "Limit reached for #{situation} (#{count} / #{maximum})" if count > maximum
end
end
def handle_actions
@link_states.each do |situation, links|
links.each { |link| process_link(situation, link[:source_id], link[:target_id]) }
end
end
def match_map
map = Hash.new { |h, k| h[k] = { source_id: [], target_id: [] } }
fields = @mapper.match_fields
@mapper.match_situations.each do |situation|
each_link_with_data(situation) do |id_type, link, data|
mapdata = data.values_at(*fields)
map[mapdata][id_type] << { link: link, situation: situation }
end
end
map
end
def process_match(source, target) # rubocop:disable Metrics/AbcSize
unlink(source[:link][:source_id], source[:link][:target_id]) if source[:situation] == :missing_target
unlink(target[:link][:source_id], target[:link][:target_id]) if target[:situation] == :missing_source
@link_states[source[:situation]].delete source[:link]
@link_states[target[:situation]].delete target[:link]
@link_states[:matched] << { source_id: source[:link][:source_id], target_id: target[:link][:target_id] }
end
def process_link(situation, source_id, target_id)
@mapper.handle(self, situation, source_id, target_id)
rescue StandardError => e
record_error source_id, target_id, situation, e.class, e.to_s
end
def record_error(source_id, target_id, situation, error, message)
@errors << { source_id: source_id, target_id: target_id, situation: situation, error: error, message: message }
return unless @mapper.limits[:error] && @errors.count > @mapper.limits[:error]
raise LimitError, "Too many errors: (#{@errors.count} / #{@mapper.limits[:error]})"
end
def each_link_with_data(situation, &block)
links = @link_states[situation]
case situation
when :unlinked_source, :missing_target then each_source_link_with_data(links, &block)
when :unlinked_target, :missing_source then each_target_link_with_data(links, &block)
else raise SyncError, "Cannot match on situation #{situation}"
end
end
def each_source_link_with_data(links)
links.each do |l|
raw_source = get_source(l[:source_id])
source = @mapper.parse(raw_source)
object = @mapper.map(source)
yield :source_id, l, @mapper.format(object)
rescue SyncError => e
record_error l[:source_id], l[:target_id], :matching_error, e.class, e.to_s
end
end
def each_target_link_with_data(links)
links.each do |l|
yield :target_id, l, get_target(l[:target_id])
end
end
end
end