Repository URL to install this package:
|
Version:
2.0.0-beta3-2-armbian20.08.0-trunk1 ▾
|
"""splunk_http_event_collector.py
Splunk HTTP event submission class
Remember: Friends don't let friends send in non Common Information Model data: http://docs.splunk.com/Documentation/CIM/latest/User/Overview
Please use CIM friendly field names when sending in data.
"""
__author__ = "george@georgestarcher.com (George Starcher)"
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
import time
import socket
import threading
import uuid
import sys
import logging
is_py2 = sys.version[0] == '2'
if is_py2:
import Queue as Queue
else:
import queue as Queue
class http_event_collector:
"""
Splunk HTTP Event Collector Class
Keyword Arguments:
token -- the Splunk HEC token value - required
http_event_server -- the Splunk Server name or ip. Name must be network resolvable. - required
input_type -- json or raw HEC type - provided at init (default json)
host -- value to use as host field for events sent to Splunk (default the local system's hostname)
http_event_port -- Splunk HEC network port (default 8088)
http_event_server_ssl -- boolean to set if Splunk HEC is using SSL (default True)
Attributes:
SSL_verify -- boolean flag to force SSL certificate verification (default false)
popNullFields -- boolean flag to pop null fields off payload prior to sending to Splunk (default false)
index -- optional index name for HEC events (default None)
sourcetype -- optional sourcetype name for HEC events (default None)
server_uri -- computed property for HEC uri based on HEC type, raw metadata etc.
Example Init:
from splunk_http_event_collector import http_event_collector
testeventJSON = http_event_collector("4D14F8D9-D788-4E6E-BF2D-D1A46441242E","localhost")
For full usage example: https://github.com/georgestarcher/Splunk-Class-httpevent/blob/master/example.py
"""
# Default batch max size to match splunk's default limits for max byte
# See http_input stanza in limits.conf; note in testing I had to limit to 100,000 to avoid http event collector breaking connection
# Auto flush will occur if next event payload will exceed limit
maxByteLength = 100000
threadCount = 10
# An improved requests retry method from
# https://www.peterbe.com/plog/best-practice-with-retries-with-requests
# 503 added for endpoint busy
# 408 added in case using HAproxy
def requests_retry_session(self, retries=3,backoff_factor=0.3,status_forcelist=(408,500,502,503,504),session=None):
session = session or requests.Session()
retry = Retry(total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, method_whitelist=frozenset(['HEAD', 'TRACE', 'GET', 'PUT', 'OPTIONS', 'DELETE', 'POST']))
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def __init__(self,token,http_event_server,input_type='json',host="",http_event_port='8088',http_event_server_ssl=True):
self.log = logging.getLogger(u'HEC')
self.log.setLevel(logging.INFO)
self.token = token
self.SSL_verify = False
self.http_event_server = http_event_server
self.http_event_server_ssl = http_event_server_ssl
self.http_event_port = http_event_port
self.index = ""
self.sourcetype = ""
self.batchEvents = []
self.currentByteLength = 0
self.input_type = input_type
self.popNullFields = False
self.flushQueue = Queue.Queue(0)
for x in range(self.threadCount):
t = threading.Thread(target=self._batchThread)
t.daemon = True
t.start()
if self.SSL_verify == False:
requests.packages.urllib3.disable_warnings()
# Set host to specified value or default to localhostname if no value provided
if host:
self.host = host
else:
self.host = socket.gethostname()
self.log.info("HEC Instance Ready: server_uri=%s",self.server_uri)
@property
def server_uri(self):
# Build and set server_uri for http event collector
# Defaults to SSL if flag not passed
# Defaults to port 8088 if port not passed
if self.http_event_server_ssl:
protocol = 'https'
else:
protocol = 'http'
if self.input_type == 'raw':
input_url = '/raw?channel='+str(uuid.uuid1())
if self.sourcetype: input_url = input_url+'&sourcetype='+self.sourcetype
if self.index: input_url = input_url+'&index='+self.index
else:
input_url = '/event'
if self.sourcetype or self.index: input_url = input_url+'?'
if self.sourcetype: input_url = input_url+'sourcetype='+self.sourcetype+"&"
if self.index: input_url = input_url+'index='+self.index+"&"
server_uri = '%s://%s:%s/services/collector%s' % (protocol, self.http_event_server, self.http_event_port, input_url)
return (server_uri)
def check_connectivity(self):
"""
method to check connectivity to Splunk HEC
Reference:
https://docs.splunk.com/Documentation/Splunk/8.0.2/Data/TroubleshootHTTPEventCollector
Notes:
method will return false if HEC token is incorrect (http status code 401 or 403).
method will return false & log warning on all failures
method will return status message in all instances to provide failure reason
"""
self.log.info("Checking HEC Server URI reachability.")
headers = {'Authorization':'Splunk '+self.token}
payload = dict()
response = dict()
hec_status_msg = "Splunk Server URI is unreachable."
hec_reachable = False
acceptable_status_codes = [400]
bad_token_codes = [401,403]
heath_warning_status_codes = [500,503]
try:
response = self.requests_retry_session().post(self.server_uri, data=payload, headers=headers, verify=self.SSL_verify)
if response:
self.log.info("Splunk Server URI is reachable.")
hec_reachable = True
else:
if response.status_code in acceptable_status_codes:
hec_status_msg = "Splunk Server URI is reachable."
self.log.info(hec_status_msg)
self.log.warn("Connectivity Check: http_status_code=%s http_message=%s",response.status_code,response.text)
hec_reachable = True
hec_status_msg = "Connectivity OK"
elif response.status_code in bad_token_codes:
hec_status_msg = "Splunk HEC Server token is invalid or disabled"
self.log.warn(hec_status_msg)
self.log.error("Connectivity Check: http_status_code=%s http_message=%s",response.status_code,response.text)
elif response.status_code in heath_warning_status_codes:
hec_status_msg = "Splunk HEC Server has potential health issues"
self.log.warn(hec_status_msg)
self.log.error("Connectivity Check: http_status_code=%s http_message=%s",response.status_code,response.text)
else:
hec_status_msg = "Splunk Server URI is unreachable."
self.log.warn(hec_status_msg)
self.log.error("HTTP status_code=%s message=%s",response.status_code,response.text)
except Exception as e:
self.log.warn(hec_status_msg)
self.log.exception(e)
return (hec_reachable, hec_status_msg)
def sendEvent(self,payload,eventtime=""):
"""Method to immediately send an event to the http event collector"""
if self.input_type == 'json':
# If eventtime in epoch not passed as optional argument and not in payload, use current system time in epoch
if not eventtime and 'time' not in payload:
eventtime = str(round(time.time(),3))
payload.update({'time':eventtime})
# Fill in local hostname if not manually populated
if 'host' not in payload:
payload.update({"host":self.host})
# send event to http event collector
event = []
if self.input_type == 'json':
if self.popNullFields:
payloadEvent = payload.get('event')
payloadEvent = {k:payloadEvent.get(k) for k,v in payloadEvent.items() if v}
payload.update({"event":payloadEvent})
event.append(json.dumps(payload, default=str))
else:
event.append(str(payload))
self.flushQueue.put(event)
self.log.debug("Single Submit: Sticking the event on the queue.")
self.log.debug("event:%s",event)
self._waitUntilDone()
def batchEvent(self,payload,eventtime=""):
"""
Recommended Method to place the event on the batch queue. Queue will auto flush as needed.
"""
if self.input_type == 'json':
# Fill in local hostname if not manually populated
if 'host' not in payload:
payload.update({"host":self.host})
# If eventtime in epoch not passed as optional argument and not in payload, use current system time in epoch
if not eventtime and 'time' not in payload:
eventtime = str(round(time.time(),3))
payload.update({"time":eventtime})
if self.popNullFields:
payloadEvent = payload.get('event')
payloadEvent = {k:payloadEvent.get(k) for k,v in payloadEvent.items() if v}
payload.update({"event":payloadEvent})
payloadString = json.dumps(payload, default=str)
else:
payloadString = str(payload)
if not payloadString.endswith("\n"):
payloadString=payloadString+"\n"
payloadLength = len(payloadString)
if ((self.currentByteLength+payloadLength) > self.maxByteLength or (self.maxByteLength - self.currentByteLength) < payloadLength):
self.log.debug("Auto Flush: Sticking the batch on the queue.")
self.flushQueue.put(self.batchEvents)
self.batchEvents = []
self.currentByteLength = 0
self.batchEvents.append(payloadString)
self.currentByteLength += payloadLength
def _batchThread(self):
"""Internal Function: Threads to send batches of events."""
while True:
self.log.debug("Events received on thread. Sending to Splunk.")
payload = " ".join(self.flushQueue.get())
headers = {'Authorization':'Splunk '+self.token}
# try to post payload twice then give up and move on
try:
response = self.requests_retry_session().post(self.server_uri, data=payload, headers=headers, verify=self.SSL_verify)
self.log.debug("batch_thread: http_status_code=%s http_message=%s",response.status_code,response.text)
except Exception as e:
self.log.exception(e)
self.flushQueue.task_done()
def _waitUntilDone(self):
"""Internal Function: Block until all flushQueue is empty."""
self.flushQueue.join()
return
def flushBatch(self):
"""Method called to force flushing of remaining batch events.
Always call this method before exiting your code to send any partial batch queue.
"""
self.log.debug("Manual Flush: Sticking the batch on the queue.")
self.flushQueue.put(self.batchEvents)
self.batchEvents = []
self.currentByteLength = 0
self._waitUntilDone()
def main():
# init logging config, this would be job of your main code using this class.
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S %z')
# This main method is a test and example section. Normal use you would import this class into your code you wish to send
# Create event collector object, default SSL and HTTP Event Collector Port
http_event_collector_key_json = "PUTCOLLECTORKEYHERE"
http_event_collector_key_raw = "PUTCOLLECTORKEYHERE"
http_event_collector_host = "HOSTNAMEOFTHECOLLECTOR"
# Example with the JSON connection logging to debug
testeventJSON = http_event_collector(http_event_collector_key_json, http_event_collector_host,'json')
testeventJSON.log.setLevel(logging.DEBUG)
# Set option to pop empty fields to True, default is False to preserve previous class behavior. Only applies to JSON method
testeventJSON.popNullFields = True
# Start event payload and add the metadata information
payload = {}
payload.update({"index":"test"})
payload.update({"sourcetype":"txt"})
payload.update({"source":"test"})
payload.update({"host":"mysterymachine"})
# Add 5 test events
for i in range(5):
payload.update({"event":{"action":"success","type":"json","message":"individual hello world","testBool":False,"event_id":i}})
testeventJSON.sendEvent(payload)
# Batch add 50000 test events
for i in range(50000):
payload.update({"event":{"action":"success","type":"json","message":"batch hello world","testBool":"","event_id":i}})
testeventJSON.batchEvent(payload)
testeventJSON.flushBatch()
# Example with the JSON connection logging default to INFO
testeventRAW = http_event_collector(http_event_collector_key_raw, http_event_collector_host,'raw')
# Set option to pop empty fields to True, default is False to preserve previous class behavior. Only applies to JSON method
testeventJSON.popNullFields = True
# Start event payload and add the metadata information
payload = {}
payload.update({"index":"test"})
payload.update({"sourcetype":"txt"})
payload.update({"source":"test"})
payload.update({"host":"mysterymachine"})
# Add 5 test events
for i in range(5):
payload.update({"event":{"action":"success","type":"json","message":"individual hello world","testBool":False,"event_id":i}})
testeventRAW.sendEvent("%s type=raw message=individual" % time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime()))
# Batch add 50000 test events
for i in range(50000):
payload.update({"event":{"action":"success","type":"json","message":"batch hello world","testBool":"","event_id":i}})
testeventRAW.batchEvent("%s type=raw message=batch event_id=%s" % (time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime()), str(i)))
testeventRAW.flushBatch()
exit()
if __name__ == "__main__":
main()