Repository URL to install this package:
|
Version:
0.3.6.1 ▾
|
# coding: utf-8
require 'helper'
require 'mysql2-cs-bind'
require 'fluent/test/driver/output'
require 'fluent/plugin/buffer'
require 'fluent/config'
require 'time'
require 'timecop'
class MysqlBulkOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
def config_element(name = 'test', argument = '', params = {}, elements = [])
Fluent::Config::Element.new(name, argument, params, elements)
end
CONFIG = %[
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at
key_names id,users,created_at
table users
]
def create_driver(conf = CONFIG)
d = Fluent::Test::Driver::Output.new(Fluent::Plugin::MysqlBulkOutput).configure(conf)
d.instance.instance_eval {
def client
obj = Object.new
obj.instance_eval {
def xquery(*args); [1]; end
def close; true; end
}
obj
end
}
d
end
def create_metadata(timekey: nil, tag: nil, variables: nil)
Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
end
class TestExpandPlaceholders < self
data("table" => {"database" => "test_app_development",
"table" => "users_${tag}",
"extracted_database" => "test_app_development",
"extracted_table" => "users_input_mysql"
},
"database" => {"database" => "test_app_development_${tag}",
"table" => "users",
"extracted_database" => "test_app_development_input_mysql",
"extracted_table" => "users"
},
)
def test_expand_tag_placeholder(data)
config = config_element('ROOT', '', {
"@type" => "mysql_bulk",
"host" => "localhost",
"database" => data["database"],
"username" => "root",
"password" => "hogehoge",
"column_names" => "id,user_name,created_at",
"table" => data["table"],
}, [config_element('buffer', 'tag', {
"@type" => "memory",
"flush_interval" => "60s",
}, [])])
d = create_driver(config)
time = Time.now
metadata = create_metadata(timekey: time.to_i, tag: 'input.mysql')
database, table = d.instance.expand_placeholders(metadata)
assert_equal(data["extracted_database"], database)
assert_equal(data["extracted_table"], table)
end
def setup
Timecop.freeze(Time.parse("2016-09-26"))
end
data("table" => {"database" => "test_app_development",
"table" => "users_%Y%m%d",
"extracted_database" => "test_app_development",
"extracted_table" => "users_20160926"
},
"database" => {"database" => "test_app_development_%Y%m%d",
"table" => "users",
"extracted_database" => "test_app_development_20160926",
"extracted_table" => "users"
},
)
def test_expand_time_placeholder(data)
config = config_element('ROOT', '', {
"@type" => "mysql_bulk",
"host" => "localhost",
"database" => data["database"],
"username" => "root",
"password" => "hogehoge",
"column_names" => "id,user_name,created_at",
"table" => data["table"],
}, [config_element('buffer', 'time', {
"@type" => "memory",
"timekey" => "60s",
"timekey_wait" => "60s"
}, [])])
d = create_driver(config)
time = Time.now
metadata = create_metadata(timekey: time.to_i, tag: 'input.mysql')
database, table = d.instance.expand_placeholders(metadata)
assert_equal(data["extracted_database"], database)
assert_equal(data["extracted_table"], table)
end
def teardown
Timecop.return
end
end
def test_configure_error
assert_raise(Fluent::ConfigError) do
create_driver %[
host localhost
database test_app_development
username root
password hogehoge
table users
on_duplicate_key_update true
on_duplicate_update_keys user_name,updated_at
flush_interval 10s
]
end
assert_raise(Fluent::ConfigError) do
create_driver %[
host localhost
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
on_duplicate_key_update true
flush_interval 10s
]
end
assert_raise(Fluent::ConfigError) do
create_driver %[
host localhost
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
on_duplicate_key_update true
on_duplicate_update_keys user_name,updated_at
flush_interval 10s
]
end
assert_raise(Fluent::ConfigError) do
create_driver %[
host localhost
username root
password hogehoge
column_names id,user_name,login_count,created_at,updated_at
table users
on_duplicate_key_update true
on_duplicate_update_keys login_count,updated_at
on_duplicate_update_custom_values login_count
flush_interval 10s
]
end
end
def test_configure
# not define format(default csv)
assert_nothing_raised(Fluent::ConfigError) do
create_driver %[
host localhost
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
on_duplicate_key_update true
on_duplicate_update_keys user_name,updated_at
flush_interval 10s
]
end
assert_nothing_raised(Fluent::ConfigError) do
create_driver %[
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
]
end
assert_nothing_raised(Fluent::ConfigError) do
create_driver %[
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
on_duplicate_key_update true
on_duplicate_update_keys user_name,updated_at
]
end
assert_nothing_raised(Fluent::ConfigError) do
create_driver %[
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
key_names id,user,created_date,updated_date
table users
on_duplicate_key_update true
on_duplicate_update_keys user_name,updated_at
]
end
assert_nothing_raised(Fluent::ConfigError) do
create_driver %[
database test_app_development
username root
password hogehoge
key_names id,url,request_headers,params,created_at,updated_at
column_names id,url,request_headers_json,params_json,created_date,updated_date
json_key_names request_headers,params
table access
]
end
assert_nothing_raised(Fluent::ConfigError) do
create_driver %[
database test_app_development
username root
password hogehoge
column_names id,user_name,login_count,created_at,updated_at
key_names id,user_name,login_count,created_date,updated_date
table users
on_duplicate_key_update true
on_duplicate_update_keys login_count,updated_at
on_duplicate_update_custom_values ${`login_count` + 1},updated_at
]
end
end
def test_variables
d = create_driver %[
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
on_duplicate_key_update true
on_duplicate_update_keys user_name,updated_at
]
assert_equal ['id','user_name','created_at','updated_at'], d.instance.key_names
assert_equal ['id','user_name','created_at','updated_at'], d.instance.column_names
assert_equal nil, d.instance.json_key_names
assert_equal nil, d.instance.unixtimestamp_key_names
assert_equal " ON DUPLICATE KEY UPDATE user_name = VALUES(user_name),updated_at = VALUES(updated_at)", d.instance.instance_variable_get(:@on_duplicate_key_update_sql)
d = create_driver %[
database test_app_development
username root
password hogehoge
column_names id,user_name,created_at,updated_at
table users
]
assert_equal ['id','user_name','created_at','updated_at'], d.instance.key_names
assert_equal ['id','user_name','created_at','updated_at'], d.instance.column_names
assert_equal nil, d.instance.json_key_names
assert_equal nil, d.instance.unixtimestamp_key_names
assert_nil d.instance.instance_variable_get(:@on_duplicate_key_update_sql)
d = create_driver %[
database test_app_development
username root
password hogehoge
key_names id,user_name,created_at,updated_at
column_names id,user,created_date,updated_date
table users
]
assert_equal ['id','user_name','created_at','updated_at'], d.instance.key_names
assert_equal ['id','user','created_date','updated_date'], d.instance.column_names
assert_equal nil, d.instance.json_key_names
assert_equal nil, d.instance.unixtimestamp_key_names
assert_nil d.instance.instance_variable_get(:@on_duplicate_key_update_sql)
d = create_driver %[
database test_app_development
username root
password hogehoge
key_names id,url,request_headers,params,created_at,updated_at
column_names id,url,request_headers_json,params_json,created_date,updated_date
unixtimestamp_key_names created_at,updated_at
json_key_names request_headers,params
table access
]
assert_equal ['id','url','request_headers','params','created_at','updated_at'], d.instance.key_names
assert_equal ['id','url','request_headers_json','params_json','created_date','updated_date'], d.instance.column_names
assert_equal ['request_headers','params'], d.instance.json_key_names
assert_equal ['created_at', 'updated_at'], d.instance.unixtimestamp_key_names
assert_nil d.instance.instance_variable_get(:@on_duplicate_key_update_sql)
d = create_driver %[
database test_app_development
username root
password hogehoge
column_names id,user_name,login_count,created_at,updated_at
table users
on_duplicate_key_update true
on_duplicate_update_keys login_count,updated_at
on_duplicate_update_custom_values ${`login_count` + 1},updated_at
]
assert_equal ['id','user_name','login_count','created_at','updated_at'], d.instance.key_names
assert_equal ['id','user_name','login_count','created_at','updated_at'], d.instance.column_names
assert_equal nil, d.instance.json_key_names
assert_equal nil, d.instance.unixtimestamp_key_names
assert_equal " ON DUPLICATE KEY UPDATE login_count = `login_count` + 1,updated_at = VALUES(updated_at)", d.instance.instance_variable_get(:@on_duplicate_key_update_sql)
end
def test_spaces_in_columns
d = create_driver %[
database test_app_development
username root
password hogehoge
column_names id, user_name, created_at, updated_at
table users
]
assert_equal ['id','user_name','created_at','updated_at'], d.instance.key_names
assert_equal ['id','user_name','created_at','updated_at'], d.instance.column_names
d = create_driver %[
database test_app_development
username root
password hogehoge
key_names id, user_name, created_at, updated_at
column_names id, user_name, created_at, updated_at
table users
]
assert_equal ['id','user_name','created_at','updated_at'], d.instance.key_names
assert_equal ['id','user_name','created_at','updated_at'], d.instance.column_names
end
end