Repository URL to install this package:
|
Version:
0.6.1 ▾
|
require 'twitter'
require 'nkf'
require 'string/scrub' if RUBY_VERSION.to_f < 2.1
require "fluent/input"
module Fluent
class TwitterInput < Fluent::Input
TIMELINE_TYPE = %w(userstream sampling location tracking)
OUTPUT_FORMAT_TYPE = %w(nest flat simple)
Plugin.register_input('twitter', self)
# To support Fluentd v0.10.57 or earlier
unless method_defined?(:router)
define_method("router") { Fluent::Engine }
end
config_param :consumer_key, :string, :secret => true
config_param :consumer_secret, :string, :secret => true
config_param :access_token, :string, :secret => true
config_param :access_token_secret, :string, :secret => true
config_param :tag, :string
config_param :timeline, :string
config_param :keyword, :string, :default => nil
config_param :follow_ids, :string, :default => nil
config_param :locations, :string, :default => nil
config_param :lang, :string, :default => nil
config_param :output_format, :string, :default => 'simple'
config_param :flatten_separator, :string, :default => '_'
def initialize
super
end
def configure(conf)
super
if !TIMELINE_TYPE.include?(@timeline)
raise Fluent::ConfigError, "timeline value undefined #{@timeline}"
end
if !OUTPUT_FORMAT_TYPE.include?(@output_format)
raise Fluent::ConfigError, "output_format value undefined #{@output_format}"
end
@keyword = @keyword.gsub('${hashtag}', '#') unless @keyword.nil?
@client = Twitter::Streaming::Client.new do |config|
config.consumer_key = @consumer_key
config.consumer_secret = @consumer_secret
config.access_token = @access_token
config.access_token_secret = @access_token_secret
end
end
def start
@thread = Thread.new(&method(:run))
end
def shutdown
Thread.kill(@thread)
end
def run
notice = "twitter: starting Twitter Streaming API for #{@timeline}."
notice << " tag:#{@tag}"
notice << " lang:#{@lang}" unless @lang.nil?
notice << " keyword:#{@keyword}" unless @keyword.nil?
notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil?
$log.info notice
if ['sampling', 'tracking'].include?(@timeline) && @keyword
@client.filter(track: @keyword, &method(:handle_object))
elsif @timeline == 'tracking' && @follow_ids
@client.filter(follow: @follow_ids, &method(:handle_object))
elsif @timeline == 'sampling' && @keyword.nil? && @follow_ids.nil?
@client.sample(&method(:handle_object))
elsif @timeline == 'userstream'
@client.user(&method(:handle_object))
end
end
def handle_object(object)
if is_message?(object)
get_message(object)
end
end
def is_message?(tweet)
return false if !tweet.is_a?(Twitter::Tweet)
return false if (!@lang.nil? && @lang != '') && !@lang.include?(tweet.user.lang)
if @timeline == 'userstream' && (!@keyword.nil? && @keyword != '')
pattern = NKF::nkf('-WwZ1', @keyword).gsub(/,\s?/, '|')
tweet = NKF::nkf('-WwZ1', tweet.text)
return false if !Regexp.new(pattern, Regexp::IGNORECASE).match(tweet)
end
return true
end
def get_message(tweet)
case @output_format
when 'nest'
record = hash_key_to_s(tweet.to_h)
when 'flat'
record = hash_flatten(tweet.to_h)
when 'simple'
record = Hash.new
record.store('message', tweet.text).scrub('')
record.store('geo', tweet.geo)
record.store('place', tweet.place)
record.store('created_at', tweet.created_at)
record.store('user_name', tweet.user.name)
record.store('user_screen_name', tweet.user.screen_name)
record.store('user_profile_image_url', tweet.user.profile_image_url)
record.store('user_time_zone', tweet.user.time_zone)
record.store('user_lang', tweet.user.lang)
end
router.emit(@tag, Engine.now, record)
end
def hash_flatten(record, prefix = nil)
record.inject({}) do |d, (k, v)|
k = prefix.to_s + k.to_s
if v.instance_of?(Hash)
d.merge(hash_flatten(v, k + @flatten_separator))
elsif v.instance_of?(String)
d.merge(k => v.scrub(""))
else
d.merge(k => v)
end
end
end
def hash_key_to_s(hash)
newhash = {}
hash.each do |k, v|
if v.instance_of?(Hash) then
newhash[k.to_s] = hash_key_to_s(v)
elsif v.instance_of?(Array) then
newhash[k.to_s] = array_key_to_s(v)
elsif v.instance_of?(String)
newhash[k.to_s] = v.scrub('')
else
newhash[k.to_s] = v
end
end
newhash
end
def array_key_to_s(array)
array.map do |v|
if v.instance_of?(Hash) then
hash_key_to_s(v)
elsif v.instance_of?(Array) then
array_key_to_s(v)
elsif v.instance_of?(String) then
v.scrub('')
else
v
end
end
end
end
end
# TODO: Remove this monkey patch after release new version of twitter gem
#
# See: https://github.com/sferik/twitter/pull/815
class Twitter::NullObject
def to_json(*args)
nil.to_json(*args)
end
end