Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
fluent-plugin-twitter / lib / fluent / plugin / in_twitter.rb
Size: Mime:
require 'twitter'
require 'nkf'
require 'string/scrub' if RUBY_VERSION.to_f < 2.1

require "fluent/input"

module Fluent
  class TwitterInput < Fluent::Input
    TIMELINE_TYPE = %w(userstream sampling location tracking)
    OUTPUT_FORMAT_TYPE = %w(nest flat simple)
    Plugin.register_input('twitter', self)

    # To support Fluentd v0.10.57 or earlier
    unless method_defined?(:router)
      define_method("router") { Fluent::Engine }
    end

    config_param :consumer_key, :string, :secret => true
    config_param :consumer_secret, :string, :secret => true
    config_param :access_token, :string, :secret => true
    config_param :access_token_secret, :string, :secret => true
    config_param :tag, :string
    config_param :timeline, :string
    config_param :keyword, :string, :default => nil
    config_param :follow_ids, :string, :default => nil
    config_param :locations, :string, :default => nil
    config_param :lang, :string, :default => nil
    config_param :output_format, :string, :default => 'simple'
    config_param :flatten_separator, :string, :default => '_'

    def initialize
      super
    end

    def configure(conf)
      super

      if !TIMELINE_TYPE.include?(@timeline)
        raise Fluent::ConfigError, "timeline value undefined #{@timeline}"
      end
      if !OUTPUT_FORMAT_TYPE.include?(@output_format)
        raise Fluent::ConfigError, "output_format value undefined #{@output_format}"
      end

      @keyword = @keyword.gsub('${hashtag}', '#') unless @keyword.nil?

      @client = Twitter::Streaming::Client.new do |config|
        config.consumer_key = @consumer_key
        config.consumer_secret = @consumer_secret
        config.access_token = @access_token
        config.access_token_secret = @access_token_secret
      end
    end

    def start
      @thread = Thread.new(&method(:run))
    end

    def shutdown
      Thread.kill(@thread)
    end

    def run
      notice = "twitter: starting Twitter Streaming API for #{@timeline}."
      notice << " tag:#{@tag}"
      notice << " lang:#{@lang}" unless @lang.nil?
      notice << " keyword:#{@keyword}" unless @keyword.nil?
      notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil?
      $log.info notice

      if ['sampling', 'tracking'].include?(@timeline) && @keyword
        @client.filter(track: @keyword, &method(:handle_object))
      elsif @timeline == 'tracking' && @follow_ids
        @client.filter(follow: @follow_ids, &method(:handle_object))
      elsif @timeline == 'sampling' && @keyword.nil? && @follow_ids.nil?
        @client.sample(&method(:handle_object))
      elsif @timeline == 'userstream'
        @client.user(&method(:handle_object))
      end
    end

    def handle_object(object)
      if is_message?(object)
        get_message(object)
      end
    end

    def is_message?(tweet)
      return false if !tweet.is_a?(Twitter::Tweet)
      return false if (!@lang.nil? && @lang != '') && !@lang.include?(tweet.user.lang)
      if @timeline == 'userstream' && (!@keyword.nil? && @keyword != '')
        pattern = NKF::nkf('-WwZ1', @keyword).gsub(/,\s?/, '|')
        tweet = NKF::nkf('-WwZ1', tweet.text)
        return false if !Regexp.new(pattern, Regexp::IGNORECASE).match(tweet)
      end
      return true
    end

    def get_message(tweet)
      case @output_format
      when 'nest'
        record = hash_key_to_s(tweet.to_h)
      when 'flat'
        record = hash_flatten(tweet.to_h)
      when 'simple'
        record = Hash.new
        record.store('message', tweet.text).scrub('')
        record.store('geo', tweet.geo)
        record.store('place', tweet.place)
        record.store('created_at', tweet.created_at)
        record.store('user_name', tweet.user.name)
        record.store('user_screen_name', tweet.user.screen_name)
        record.store('user_profile_image_url', tweet.user.profile_image_url)
        record.store('user_time_zone', tweet.user.time_zone)
        record.store('user_lang', tweet.user.lang)
      end
      router.emit(@tag, Engine.now, record)
    end

    def hash_flatten(record, prefix = nil)
      record.inject({}) do |d, (k, v)|
        k = prefix.to_s + k.to_s
        if v.instance_of?(Hash)
          d.merge(hash_flatten(v, k + @flatten_separator))
        elsif v.instance_of?(String)
          d.merge(k => v.scrub(""))
        else
          d.merge(k => v)
        end
      end
    end

    def hash_key_to_s(hash)
      newhash = {}
      hash.each do |k, v|
        if v.instance_of?(Hash) then
          newhash[k.to_s] = hash_key_to_s(v)
        elsif v.instance_of?(Array) then
          newhash[k.to_s] = array_key_to_s(v)
        elsif v.instance_of?(String)
          newhash[k.to_s] = v.scrub('')
        else
          newhash[k.to_s] = v
        end
      end
      newhash
    end

    def array_key_to_s(array)
      array.map do |v|
        if v.instance_of?(Hash) then
          hash_key_to_s(v)
        elsif v.instance_of?(Array) then
          array_key_to_s(v)
        elsif v.instance_of?(String) then
          v.scrub('')
        else
          v
        end
      end
    end
  end
end

# TODO: Remove this monkey patch after release new version of twitter gem
#
# See: https://github.com/sferik/twitter/pull/815
class Twitter::NullObject
  def to_json(*args)
    nil.to_json(*args)
  end
end