Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
uoy-faculty_aws / lib / faculty_aws / background_task.rb
Size: Mime:
# frozen_string_literal: true

require 'aws-sdk-sqs'
require 'logger'

module FacultyAWS
  # Class to handle background tasks
  class BackgroundTask
    @task_definitions = {}

    LOGGER = Logger.new $stdout

    # Top-level-includable handler
    module LambdaHandler
      module_function

      def background_task_handler(event:, context:) # rubocop:disable Lint/UnusedMethodArgument
        event['Records'].each { |message| BackgroundTask.handle(message['body']) }
      rescue StandardError => e
        LOGGER.error "#{e.class} - #{e.message}"
        LOGGER.error e.backtrace.join("\n\t")
        raise
      end
    end

    class << self
      def sqs_client
        @sqs_client ||= Aws::SQS::Client.new
      end

      def inline?
        inline = ENV.fetch('BACKGROUND_INLINE', nil)
        inline && 'ty1'.include?(inline[0].downcase)
      end

      def enqueue(name, message_group_id: name, message_deduplication_id: nil, timestamp_deduplication: false,
                  **parameters)
        message = {
          queue_url: ENV.fetch('BACKGROUND_QUEUE', nil),
          message_body: { task: name.to_s, params: parameters }.to_json,
          message_group_id: message_group_id
        }
        message_deduplication_id = "#{message_group_id}_#{Time.now.strftime('%s.%N')}" if timestamp_deduplication
        message[:message_deduplication_id] = message_deduplication_id if message_deduplication_id
        dispatch(message)
      end

      def dispatch(message)
        if inline?
          handle(message[:message_body])
        else
          sqs_client.send_message(message)
        end
      end

      def register(name, task)
        taskname = name.to_s
        raise ArgumentError, "Task already defined: #{taskname}" if @task_definitions.key? taskname

        @task_definitions[taskname] = task
      end

      def parse_task(message)
        taskinfo = JSON.parse(message, symbolize_names: true)
        taskname = taskinfo.fetch(:task)
        [taskname, taskinfo[:params] || {}]
      rescue StandardError => e
        raise e, "Failed to parse background task: #{e}"
      end

      def handle(message_body)
        name, params = parse_task(message_body)
        run(name, **params)
      end

      def run(name, **parameters)
        begin
          task = @task_definitions.fetch(name.to_s)
        rescue KeyError => e
          raise e, "Background task #{name}: #{e}"
        end
        task.run(**parameters)
      rescue StandardError => e
        raise unless task&.handle_error(e)
      end
    end

    def initialize(name, notify_on_failure: true, raise_on_failure: true, &block)
      @name = name
      @block = block
      @notify_on_failure = notify_on_failure
      @raise_on_failure = raise_on_failure
      self.class.register(@name, self)
    end

    def run(...)
      @block.call(...).tap do
        LOGGER.info "Completed background task #{@name}"
      end
    rescue StandardError => e
      raise e, "Background task #{@name}: #{e}"
    end

    def handle_error(error)
      FacultyAWS::NotifyDevs.send_lambda_failed if @notify_on_failure && !self.class.inline?
      return false if @raise_on_failure

      LOGGER.error "#{error.class} - #{error.message}"
      LOGGER.error error.backtrace.join("\n\t")
      true
    end
  end
end