Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
fluent-plugin-cloudwatch-logs / lib / fluent / plugin / out_cloudwatch_logs.rb
Size: Mime:
require 'fluent/output'

module Fluent
  require 'fluent/mixin/config_placeholders'

  require 'yajl'

  class CloudwatchLogsOutput < BufferedOutput
    Plugin.register_output('cloudwatch_logs', self)

    include Fluent::SetTimeKeyMixin
    include Fluent::Mixin::ConfigPlaceholders

    config_param :aws_key_id, :string, :default => nil, :secret => true
    config_param :aws_sec_key, :string, :default => nil, :secret => true
    config_param :region, :string, :default => nil
    config_param :log_group_name, :string, :default => nil
    config_param :log_stream_name, :string, :default => nil
    config_param :auto_create_stream, :bool, default: false
    config_param :message_keys, :string, :default => nil
    config_param :max_message_length, :integer, :default => nil
    config_param :max_events_per_batch, :integer, :default => 10000
    config_param :use_tag_as_group, :bool, :default => false  # TODO: Rename to use_tag_as_group_name ?
    config_param :use_tag_as_stream, :bool, :default => false # TODO: Rename to use_tag_as_stream_name ?
    config_param :log_group_name_key, :string, :default => nil
    config_param :log_stream_name_key, :string, :default => nil
    config_param :remove_log_group_name_key, :bool, :default => false
    config_param :remove_log_stream_name_key, :bool, :default => false
    config_param :http_proxy, :string, default: nil
    config_param :put_log_events_retry_wait, :time, default: 1.0
    config_param :put_log_events_retry_limit, :integer, default: 17
    config_param :put_log_events_disable_retry_limit, :bool, default: false

    MAX_EVENTS_SIZE = 1_048_576
    MAX_EVENT_SIZE = 256 * 1024
    EVENT_HEADER_SIZE = 26

    unless method_defined?(:log)
      define_method(:log) { $log }
    end

    def initialize
      super

      require 'aws-sdk-core'
    end

    def placeholders
      [:percent]
    end

    def configure(conf)
      super

      unless [conf['log_group_name'], conf['use_tag_as_group'], conf['log_group_name_key']].compact.size == 1
        raise ConfigError, "Set only one of log_group_name, use_tag_as_group and log_group_name_key"
      end

      unless [conf['log_stream_name'], conf['use_tag_as_stream'], conf['log_stream_name_key']].compact.size == 1
        raise ConfigError, "Set only one of log_stream_name, use_tag_as_stream and log_stream_name_key"
      end
    end

    def start
      super

      options = {}
      options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key
      options[:region] = @region if @region
      options[:http_proxy] = @http_proxy if @http_proxy
      @logs ||= Aws::CloudWatchLogs::Client.new(options)
      @sequence_tokens = {}
    end

    def format(tag, time, record)
      [tag, time, record].to_msgpack
    end

    def write(chunk)
      chunk.enum_for(:msgpack_each).select {|tag, time, record|
        if record.nil?
          log.warn "record is nil (tag=#{tag})"
          false
        else
          true
        end
      }.group_by {|tag, time, record|
        group = case
                when @use_tag_as_group
                  tag
                when @log_group_name_key
                  if @remove_log_group_name_key
                    record.delete(@log_group_name_key)
                  else
                    record[@log_group_name_key]
                  end
                else
                  @log_group_name
                end

        stream = case
                 when @use_tag_as_stream
                   tag
                 when @log_stream_name_key
                   if @remove_log_stream_name_key
                     record.delete(@log_stream_name_key)
                   else
                     record[@log_stream_name_key]
                   end
                 else
                   @log_stream_name
                 end

        [group, stream]
      }.each {|group_stream, rs|
        group_name, stream_name = group_stream

        if stream_name.nil?
          log.warn "stream_name is nil (group_name=#{group_name})"
          next
        end

        unless log_group_exists?(group_name)
          if @auto_create_stream
            create_log_group(group_name)
          else
            log.warn "Log group '#{group_name}' does not exist"
            next
          end
        end

        unless log_stream_exists?(group_name, stream_name)
          if @auto_create_stream
            create_log_stream(group_name, stream_name)
          else
            log.warn "Log stream '#{stream_name}' does not exist"
            next
          end
        end

        events = []
        rs.each do |t, time, record|
          time_ms = time * 1000

          scrub_record!(record)
          if @message_keys
            message = @message_keys.split(',').map {|k| record[k].to_s }.join(' ')
          else
            message = Yajl.dump(record)
          end

          if @max_message_length
            message = message.slice(0, @max_message_length)
          end

          events << {timestamp: time_ms, message: message}
        end
        # The log events in the batch must be in chronological ordered by their timestamp.
        # http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
        events = events.sort_by {|e| e[:timestamp] }
        put_events_by_chunk(group_name, stream_name, events)
      }
    end

    private
    def scrub_record!(record)
      case record
      when Hash
        record.each_value {|v| scrub_record!(v) }
      when Array
        record.each {|v| scrub_record!(v) }
      when String
        record.scrub!
      end
    end

    def next_sequence_token(group_name, stream_name)
      @sequence_tokens[group_name][stream_name]
    end

    def store_next_sequence_token(group_name, stream_name, token)
      @sequence_tokens[group_name][stream_name] = token
    end

    def put_events_by_chunk(group_name, stream_name, events)
      chunk = []

      # The maximum batch size is 1,048,576 bytes, and this size is calculated as the sum of all event messages in UTF-8, plus 26 bytes for each log event.
      # http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
      total_bytesize = 0
      while event = events.shift
        event_bytesize = event[:message].bytesize + EVENT_HEADER_SIZE
        if MAX_EVENT_SIZE < event_bytesize
          log.warn "Log event is discarded because it is too large: #{event_bytesize} bytes exceeds limit of #{MAX_EVENT_SIZE}"
          break
        end

        new_chunk = chunk + [event]

        chunk_span_too_big = new_chunk.size > 1 && new_chunk[-1][:timestamp] - new_chunk[0][:timestamp] >= 1000 * 60 * 60 * 24
        chunk_too_big = total_bytesize + event_bytesize > MAX_EVENTS_SIZE
        chunk_too_long = @max_events_per_batch && chunk.size >= @max_events_per_batch
        if chunk_too_big or chunk_span_too_big or chunk_too_long
          put_events(group_name, stream_name, chunk, total_bytesize)
          chunk = [event]
          total_bytesize = event_bytesize
        else
          chunk << event
          total_bytesize += event_bytesize
        end
      end

      unless chunk.empty?
        put_events(group_name, stream_name, chunk, total_bytesize)
      end
    end

    def put_events(group_name, stream_name, events, events_bytesize)
      args = {
        log_events: events,
        log_group_name: group_name,
        log_stream_name: stream_name,
      }
      token = next_sequence_token(group_name, stream_name)

      response = nil
      retry_count = 0
      until response
        log.debug "Calling PutLogEvents API", {
          "group" => group_name,
          "stream" => stream_name,
          "events_count" => events.size,
          "events_bytesize" => events_bytesize,
          "sequence_token" => token,
        }

        args[:sequence_token] = token if token
        begin
          response = @logs.put_log_events(args)
        rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException, Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException => err
          sleep 1 # to avoid too many API calls
          log_stream = find_log_stream(group_name, stream_name)
          token = log_stream.upload_sequence_token
          log.warn "updating upload sequence token forcefully because unrecoverable error occured", {
            "error" => err,
            "log_group" => group_name,
            "log_stream" => stream_name,
            "new_sequence_token" => token,
          }
        rescue Aws::CloudWatchLogs::Errors::ThrottlingException => err
          if !@put_log_events_disable_retry_limit && @put_log_events_retry_limit < retry_count
            log.error "failed to PutLogEvents and discard logs because retry count exceeded put_log_events_retry_limit", {
              "error_class" => err.class.to_s,
              "error" => err.message,
            }
            return
          else
            sleep_sec = @put_log_events_retry_wait * (2 ** retry_count)
            sleep_sec += sleep_sec * (0.25 * (rand - 0.5))
            log.warn "failed to PutLogEvents", {
              "next_retry" => Time.now + sleep_sec,
              "error_class" => err.class.to_s,
              "error" => err.message,
            }
            sleep(sleep_sec)
            retry_count += 1
          end
        end
      end

      if 0 < retry_count
        log.warn "retry succeeded"
      end

      store_next_sequence_token(group_name, stream_name, response.next_sequence_token)
    end

    def create_log_group(group_name)
      begin
        @logs.create_log_group(log_group_name: group_name)
        @sequence_tokens[group_name] = {}
      rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
        log.debug "Log group '#{group_name}' already exists"
      end
    end

    def create_log_stream(group_name, stream_name)
      begin
        @logs.create_log_stream(log_group_name: group_name, log_stream_name: stream_name)
        @sequence_tokens[group_name] ||= {}
        @sequence_tokens[group_name][stream_name] = nil
      rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
        log.debug "Log stream '#{stream_name}' already exists"
      end
    end

    def log_group_exists?(group_name)
      if @sequence_tokens[group_name]
        true
      elsif @logs.describe_log_groups.any? {|page| page.log_groups.any? {|i| i.log_group_name == group_name } }
        @sequence_tokens[group_name] = {}
        true
      else
        false
      end
    end

    def log_stream_exists?(group_name, stream_name)
      if not @sequence_tokens[group_name]
        false
      elsif @sequence_tokens[group_name].has_key?(stream_name)
        true
      elsif (log_stream = find_log_stream(group_name, stream_name))
        @sequence_tokens[group_name][stream_name] = log_stream.upload_sequence_token
        true
      else
        false
      end
    end

    def find_log_stream(group_name, stream_name)
      next_token = nil
      loop do
        response = @logs.describe_log_streams(log_group_name: group_name, log_stream_name_prefix: stream_name, next_token: next_token)
        if (log_stream = response.log_streams.find {|i| i.log_stream_name == stream_name })
          return log_stream
        end
        if response.next_token.nil?
          break
        end
        next_token = response.next_token
        sleep 0.1
      end
      nil
    end
  end
end