Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

vistahigherlearning / logstash   deb

Repository URL to install this package:

/ opt / logstash / spec / outputs / elasticsearch.rb

require "test_utils"
require "ftw"
require "logstash/plugin"

describe "outputs/elasticsearch" do
  extend LogStash::RSpec

  it "should register" do
    output = LogStash::Plugin.lookup("output", "elasticsearch").new("embedded" => "false", "protocol" => "transport", "manage_template" => "false")

    # register will try to load jars and raise if it cannot find jars
    expect {output.register}.to_not raise_error
  end

  describe "ship lots of events w/ default index_type", :elasticsearch => true do
    # Generate a random index name
    index = 10.times.collect { rand(10).to_s }.join("")
    type = 10.times.collect { rand(10).to_s }.join("")

    # Write about 10000 events. Add jitter to increase likeliness of finding
    # boundary-related bugs.
    event_count = 10000 + rand(500)
    flush_size = rand(200) + 1

    config <<-CONFIG
      input {
        generator {
          message => "hello world"
          count => #{event_count}
          type => "#{type}"
        }
      }
      output {
        elasticsearch {
          host => "127.0.0.1"
          index => "#{index}"
          flush_size => #{flush_size}
        }
      }
    CONFIG

    agent do
      # Try a few times to check if we have the correct number of events stored
      # in ES.
      #
      # We try multiple times to allow final agent flushes as well as allowing
      # elasticsearch to finish processing everything.
      ftw = FTW::Agent.new
      ftw.post!("http://localhost:9200/#{index}/_refresh")

      # Wait until all events are available.
      Stud::try(10.times) do
        data = ""
        response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
        response.read_body { |chunk| data << chunk }
        result = JSON.parse(data)
        count = result["count"]
        insist { count } == event_count
      end

      response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000")
      data = ""
      response.read_body { |chunk| data << chunk }
      result = JSON.parse(data)
      result["hits"]["hits"].each do |doc|
        # With no 'index_type' set, the document type should be the type
        # set on the input
        insist { doc["_type"] } == type
        insist { doc["_index"] } == index
        insist { doc["_source"]["message"] } == "hello world"
      end
    end
  end

  describe "testing index_type", :elasticsearch => true do
    describe "no type value" do
      # Generate a random index name
      index = 10.times.collect { rand(10).to_s }.join("")
      event_count = 100 + rand(100)
      flush_size = rand(200) + 1

      config <<-CONFIG
        input {
          generator {
            message => "hello world"
            count => #{event_count}
          }
        }
        output {
          elasticsearch {
            host => "127.0.0.1"
            index => "#{index}"
            flush_size => #{flush_size}
          }
        }
      CONFIG

      agent do
        ftw = FTW::Agent.new
        ftw.post!("http://localhost:9200/#{index}/_refresh")

        # Wait until all events are available.
        Stud::try(10.times) do
          data = ""
          response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
          response.read_body { |chunk| data << chunk }
          result = JSON.parse(data)
          count = result["count"]
          insist { count } == event_count
        end

        response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000")
        data = ""
        response.read_body { |chunk| data << chunk }
        result = JSON.parse(data)
        result["hits"]["hits"].each do |doc|
          insist { doc["_type"] } == "logs"
        end
      end
    end

    describe "default event type value" do
      # Generate a random index name
      index = 10.times.collect { rand(10).to_s }.join("")
      event_count = 100 + rand(100)
      flush_size = rand(200) + 1

      config <<-CONFIG
        input {
          generator {
            message => "hello world"
            count => #{event_count}
            type => "generated"
          }
        }
        output {
          elasticsearch {
            host => "127.0.0.1"
            index => "#{index}"
            flush_size => #{flush_size}
          }
        }
      CONFIG

      agent do
        ftw = FTW::Agent.new
        ftw.post!("http://localhost:9200/#{index}/_refresh")

        # Wait until all events are available.
        Stud::try(10.times) do
          data = ""
          response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
          response.read_body { |chunk| data << chunk }
          result = JSON.parse(data)
          count = result["count"]
          insist { count } == event_count
        end

        response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000")
        data = ""
        response.read_body { |chunk| data << chunk }
        result = JSON.parse(data)
        result["hits"]["hits"].each do |doc|
          insist { doc["_type"] } == "generated"
        end
      end
    end
  end

  describe "action => ...", :elasticsearch => true do
    index_name = 10.times.collect { rand(10).to_s }.join("")

    config <<-CONFIG
      input {
        generator {
          message => "hello world"
          count => 100
        }
      }
      output {
        elasticsearch {
          host => "127.0.0.1"
          index => "#{index_name}"
        }
      }
    CONFIG


    agent do
      ftw = FTW::Agent.new
      ftw.post!("http://localhost:9200/#{index_name}/_refresh")

      # Wait until all events are available.
      Stud::try(10.times) do
        data = ""
        response = ftw.get!("http://127.0.0.1:9200/#{index_name}/_count?q=*")
        response.read_body { |chunk| data << chunk }
        result = JSON.parse(data)
        count = result["count"]
        insist { count } == 100
      end

      response = ftw.get!("http://127.0.0.1:9200/#{index_name}/_search?q=*&size=1000")
      data = ""
      response.read_body { |chunk| data << chunk }
      result = JSON.parse(data)
      result["hits"]["hits"].each do |doc|
        insist { doc["_type"] } == "logs"
      end
    end

    describe "default event type value", :elasticsearch => true do
      # Generate a random index name
      index = 10.times.collect { rand(10).to_s }.join("")
      event_count = 100 + rand(100)
      flush_size = rand(200) + 1

      config <<-CONFIG
        input {
          generator {
            message => "hello world"
            count => #{event_count}
            type => "generated"
          }
        }
        output {
          elasticsearch {
            host => "127.0.0.1"
            index => "#{index}"
            flush_size => #{flush_size}
          }
        }
      CONFIG

      agent do
        ftw = FTW::Agent.new
        ftw.post!("http://localhost:9200/#{index}/_refresh")

        # Wait until all events are available.
        Stud::try(10.times) do
          data = ""
          response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
          response.read_body { |chunk| data << chunk }
          result = JSON.parse(data)
          count = result["count"]
          insist { count } == event_count
        end

        response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000")
        data = ""
        response.read_body { |chunk| data << chunk }
        result = JSON.parse(data)
        result["hits"]["hits"].each do |doc|
          insist { doc["_type"] } == "generated"
        end
      end
    end
  end

  describe "index template expected behavior", :elasticsearch => true do
    ["node", "transport", "http"].each do |protocol|
      context "with protocol => #{protocol}" do
        subject do
          require "logstash/outputs/elasticsearch"
          settings = {
            "manage_template" => true,
            "template_overwrite" => true,
            "protocol" => protocol,
            "host" => "localhost"
          }
          next LogStash::Outputs::ElasticSearch.new(settings)
        end

        before :each do
          # Delete all templates first.
          require "elasticsearch"

          # Clean ES of data before we start.
          @es = Elasticsearch::Client.new
          @es.indices.delete_template(:name => "*")

          # This can fail if there are no indexes, ignore failure.
          @es.indices.delete(:index => "*") rescue nil

          subject.register

          subject.receive(LogStash::Event.new("message" => "sample message here"))
          subject.receive(LogStash::Event.new("somevalue" => 100))
          subject.receive(LogStash::Event.new("somevalue" => 10))
          subject.receive(LogStash::Event.new("somevalue" => 1))
          subject.receive(LogStash::Event.new("country" => "us"))
          subject.receive(LogStash::Event.new("country" => "at"))
          subject.receive(LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] }))
          subject.buffer_flush(:final => true)
          @es.indices.refresh

          # Wait or fail until everything's indexed.
          Stud::try(20.times) do
            r = @es.search
            insist { r["hits"]["total"] } == 7
          end
        end

        it "permits phrase searching on string fields" do
          results = @es.search(:q => "message:\"sample message\"")
          insist { results["hits"]["total"] } == 1
          insist { results["hits"]["hits"][0]["_source"]["message"] } == "sample message here"
        end

        it "numbers dynamically map to a numeric type and permit range queries" do
          results = @es.search(:q => "somevalue:[5 TO 105]")
          insist { results["hits"]["total"] } == 2

          values = results["hits"]["hits"].collect { |r| r["_source"]["somevalue"] }
          insist { values }.include?(10)
          insist { values }.include?(100)
          reject { values }.include?(1)
        end

        it "creates .raw field fro any string field which is not_analyzed" do
          results = @es.search(:q => "message.raw:\"sample message here\"")
          insist { results["hits"]["total"] } == 1
          insist { results["hits"]["hits"][0]["_source"]["message"] } == "sample message here"

          # partial or terms should not work.
          results = @es.search(:q => "message.raw:\"sample\"")
          insist { results["hits"]["total"] } == 0
        end

        it "make [geoip][location] a geo_point" do
          results = @es.search(:body => { "filter" => { "geo_distance" => { "distance" => "1000km", "geoip.location" => { "lat" => 0.5, "lon" => 0.5 } } } })
          insist { results["hits"]["total"] } == 1
          insist { results["hits"]["hits"][0]["_source"]["geoip"]["location"] } == [ 0.0, 0.0 ]
        end

        it "should index stopwords like 'at' " do
          results = @es.search(:body => { "facets" => { "t" => { "terms" => { "field" => "country" } } } })["facets"]["t"]
          terms = results["terms"].collect { |t| t["term"] }

          insist { terms }.include?("us")

          # 'at' is a stopword, make sure stopwords are not ignored.
          insist { terms }.include?("at")
        end
      end
Loading ...