Repository URL to install this package:
|
Version:
6.0.0 ▾
|
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, XLAB Steampunk <steampunk@xlab.si>
#
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
try:
from ansible.module_utils.compat import version
except ImportError:
from distutils import version
from . import errors, http
class Client:
BAD_VERSION = version.StrictVersion("9999.99.99")
def __init__(self, address, username, password, api_key, verify, ca_path):
self.address = address.rstrip("/")
self.username = username
self.password = password
self.api_key = api_key
self.verify = verify
self.ca_path = ca_path
self._auth_header = None # Login when/if required
self._version = None # Set version only if the consumer needs it
@property
def auth_header(self):
if not self._auth_header:
self._auth_header = self._login()
return self._auth_header
@property
def version(self):
if self._version is None:
resp = self.get("/version")
if resp.status != 200:
raise errors.SensuError(
"Version API returned status {0}".format(resp.status),
)
if resp.json is None:
raise errors.SensuError(
"Version API did not return a valid JSON",
)
if "sensu_backend" not in resp.json:
raise errors.SensuError(
"Version API did not return backend version",
)
try:
self._version = version.StrictVersion(
resp.json["sensu_backend"].split("#")[0]
)
except ValueError:
# Backend has no version compiled in - we are probably running
# againts self-compiled version from git.
self._version = self.BAD_VERSION
return self._version
def _login(self):
if self.api_key:
return self._api_key_login()
return self._username_password_login()
def _api_key_login(self):
# We cannot validate the API key because there is no API endpoint that
# we could hit for verification purposes. This means that the error
# reporting will be a mess but there is not much we can do here.
return dict(Authorization="Key {0}".format(self.api_key))
def _username_password_login(self):
resp = http.request(
"GET", "{0}/auth".format(self.address), force_basic_auth=True,
url_username=self.username, url_password=self.password,
validate_certs=self.verify, ca_path=self.ca_path,
)
if resp.status != 200:
raise errors.SensuError(
"Authentication call returned status {0}".format(resp.status),
)
if resp.json is None:
raise errors.SensuError(
"Authentication call did not return a valid JSON",
)
if "access_token" not in resp.json:
raise errors.SensuError(
"Authentication call did not return access token",
)
return dict(
Authorization="Bearer {0}".format(resp.json["access_token"]),
)
def request(self, method, path, payload=None):
url = self.address + path
headers = self.auth_header
response = http.request(
method, url, payload=payload, headers=headers,
validate_certs=self.verify, ca_path=self.ca_path,
)
if response.status in (401, 403):
raise errors.SensuError(
"Authentication problem. Verify your credentials."
)
return response
def get(self, path):
return self.request("GET", path)
def put(self, path, payload):
return self.request("PUT", path, payload)
def delete(self, path):
return self.request("DELETE", path)
def validate_auth_data(self, username, password):
resp = http.request(
"GET", "{0}/auth/test".format(self.address),
force_basic_auth=True, url_username=username,
url_password=password, validate_certs=self.verify,
ca_path=self.ca_path,
)
if resp.status not in (200, 401):
raise errors.SensuError(
"Authentication test returned status {0}".format(resp.status),
)
return resp.status == 200