Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
fluent-plugin-cloudwatch-logs / lib / fluent / plugin / in_cloudwatch_logs.rb
Size: Mime:
require 'fluent/input'
require 'fluent/parser'

module Fluent
  require 'fluent/input'
  require 'fluent/mixin/config_placeholders'

  require 'yajl'

  class CloudwatchLogsInput < Input
    Plugin.register_input('cloudwatch_logs', self)

    include Fluent::Mixin::ConfigPlaceholders

    # Define `router` method of v0.12 to support v0.10.57 or earlier
    unless method_defined?(:router)
      define_method("router") { Engine }
    end

    config_param :aws_key_id, :string, :default => nil, :secret => true
    config_param :aws_sec_key, :string, :default => nil, :secret => true
    config_param :aws_use_sts, :bool, default: false
    config_param :aws_sts_role_arn, :string, default: nil
    config_param :aws_sts_session_name, :string, default: 'fluentd'
    config_param :region, :string, :default => nil
    config_param :tag, :string
    config_param :log_group_name, :string
    config_param :log_stream_name, :string
    config_param :use_log_stream_name_prefix, :bool, default: false
    config_param :state_file, :string
    config_param :fetch_interval, :time, default: 60
    config_param :http_proxy, :string, default: nil

    def initialize
      super

      require 'aws-sdk-core'
    end

    def placeholders
      [:percent]
    end

    def configure(conf)
      super
      configure_parser(conf)
    end

    def start
      options = {}
      options[:region] = @region if @region
      options[:http_proxy] = @http_proxy if @http_proxy

      if @aws_use_sts
        Aws.config[:region] = options[:region]
        options[:credentials] = Aws::AssumeRoleCredentials.new(
          role_arn: @aws_sts_role_arn,
          role_session_name: @aws_sts_session_name
        )
      else
        options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key
      end

      @logs = Aws::CloudWatchLogs::Client.new(options)

      @finished = false
      @thread = Thread.new(&method(:run))
    end

    def shutdown
      @finished = true
      @thread.join
    end

    private
    def configure_parser(conf)
      if conf['format']
        @parser = Fluent::TextParser.new
        @parser.configure(conf)
      end
    end

    def state_file_for(log_stream_name)
      return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name
      return @state_file
    end

    def next_token(log_stream_name)
      return nil unless File.exist?(state_file_for(log_stream_name))
      File.read(state_file_for(log_stream_name)).chomp
    end

    def store_next_token(token, log_stream_name = nil)
      open(state_file_for(log_stream_name), 'w') do |f|
        f.write token
      end
    end

    def run
      @next_fetch_time = Time.now

      until @finished
        if Time.now > @next_fetch_time
          @next_fetch_time += @fetch_interval

          if @use_log_stream_name_prefix
            log_streams = describe_log_streams
            log_streams.each do |log_stram|
              log_stream_name = log_stram.log_stream_name
              events = get_events(log_stream_name)
              events.each do |event|
                emit(log_stream_name, event)
              end
            end
          else
            events = get_events(@log_stream_name)
            events.each do |event|
              emit(log_stream_name, event)
            end
          end
        end
        sleep 1
      end
    end

    def emit(stream, event)
      if @parser
        record = @parser.parse(event.message)
        router.emit(@tag, record[0], record[1])
      else
        time = (event.timestamp / 1000).floor
        record = Yajl.load(event.message)
        router.emit(@tag, time, record)
      end
    end

    def get_events(log_stream_name)
      request = {
        log_group_name: @log_group_name,
        log_stream_name: log_stream_name
      }
      request[:next_token] = next_token(log_stream_name) if next_token(log_stream_name)
      response = @logs.get_log_events(request)
      store_next_token(response.next_forward_token, log_stream_name)

      response.events
    end

    def describe_log_streams(log_streams = nil, next_token = nil)
      request = {
        log_group_name: @log_group_name
      }
      request[:next_token] = next_token if next_token
      request[:log_stream_name_prefix] = @log_stream_name
      response = @logs.describe_log_streams(request)
      if log_streams
        log_streams << response.log_streams
      else
        log_streams = response.log_streams
      end
      if response.next_token
        log_streams = describe_log_streams(log_streams, response.next_token)
      end
      log_streams
    end
  end
end