Repository URL to install this package:
|
Version:
6.0.0 ▾
|
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Brian Scholer (@briantist)
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
'''Python versions supported: >=3.6'''
# FOR INTERNAL COLLECTION USE ONLY
# The interfaces in this file are meant for use within the community.hashi_vault collection
# and may not remain stable to outside uses. Changes may be made in ANY release, even a bugfix release.
# See also: https://github.com/ansible/community/issues/539#issuecomment-780839686
# Please open an issue if you have questions about this.
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import os
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.common.validation import (
check_type_dict,
check_type_str,
check_type_bool,
check_type_int,
)
from ansible_collections.community.hashi_vault.plugins.module_utils._hashi_vault_common import HashiVaultOptionGroupBase
# we implement retries via the urllib3 Retry class
# https://github.com/ansible-collections/community.hashi_vault/issues/58
HAS_RETRIES = False
try:
from requests import Session
from requests.adapters import HTTPAdapter
try:
# try for a standalone urllib3
import urllib3
HAS_RETRIES = True
except ImportError:
try:
# failing that try for a vendored version within requests
from requests.packages import urllib3
HAS_RETRIES = True
except ImportError:
pass
except ImportError:
pass
class HashiVaultConnectionOptions(HashiVaultOptionGroupBase):
'''HashiVault option group class for connection options'''
OPTIONS = ['url', 'proxies', 'ca_cert', 'validate_certs', 'namespace', 'timeout', 'retries', 'retry_action']
ARGSPEC = dict(
url=dict(type='str', default=None),
proxies=dict(type='raw'),
ca_cert=dict(type='str', aliases=['cacert'], default=None),
validate_certs=dict(type='bool'),
namespace=dict(type='str', default=None),
timeout=dict(type='int'),
retries=dict(type='raw'),
retry_action=dict(type='str', choices=['ignore', 'warn'], default='warn'),
)
_LATE_BINDING_ENV_VAR_OPTIONS = {
'url': dict(env=['VAULT_ADDR'], required=True),
'ca_cert': dict(env=['VAULT_CACERT']),
'namespace': dict(env=['VAULT_NAMESPACE']),
}
_RETRIES_DEFAULT_PARAMS = {
'status_forcelist': [
# https://www.vaultproject.io/api#http-status-codes
# 429 is usually a "too many requests" status, but in Vault it's the default health status response for standby nodes.
500, # Internal server error. An internal error has occurred, try again later. If the error persists, report a bug.
502, # A request to Vault required Vault making a request to a third party; the third party responded with an error of some kind.
503, # Vault is down for maintenance or is currently sealed. Try again later.
],
(
# this field name changed in 1.26.0, and in the interest of supporting a wider range of urllib3 versions
# we'll use the new name whenever possible, but fall back seamlessly when needed.
# See also:
# - https://github.com/urllib3/urllib3/issues/2092
# - https://github.com/urllib3/urllib3/blob/main/CHANGES.rst#1260-2020-11-10
"allowed_methods" if HAS_RETRIES and hasattr(urllib3.util.Retry.DEFAULT, "allowed_methods") else "method_whitelist"
): None, # None allows retries on all methods, including those which may not be considered idempotent, like POST
'backoff_factor': 0.3,
}
def __init__(self, option_adapter, retry_callback_generator=None):
super(HashiVaultConnectionOptions, self).__init__(option_adapter)
self._retry_callback_generator = retry_callback_generator
def get_hvac_connection_options(self):
'''returns kwargs to be used for constructing an hvac.Client'''
# validate_certs is only used to optionally change the value of ca_cert
def _filter(k, v):
return v is not None and k != 'validate_certs'
# our transformed ca_cert value will become the verify parameter for the hvac client
hvopts = self._options.get_filtered_options(_filter, *self.OPTIONS)
hvopts['verify'] = hvopts.pop('ca_cert')
retry_action = hvopts.pop('retry_action')
if 'retries' in hvopts:
hvopts['session'] = self._get_custom_requests_session(new_callback=self._retry_callback_generator(retry_action), **hvopts.pop('retries'))
return hvopts
def process_connection_options(self):
'''executes special processing required for certain options'''
self.process_late_binding_env_vars(self._LATE_BINDING_ENV_VAR_OPTIONS)
self._boolean_or_cacert()
self._process_option_proxies()
self._process_option_retries()
def _get_custom_requests_session(self, **retry_kwargs):
'''returns a requests.Session to pass to hvac (or None)'''
if not HAS_RETRIES:
# because hvac requires requests which requires urllib3 it's unlikely we'll ever reach this condition.
raise NotImplementedError("Retries are unavailable. This may indicate very old versions of one or more of the following: hvac, requests, urllib3.")
# This is defined here because Retry may not be defined if its import failed.
# As mentioned above, that's very unlikely, but it'll fail sanity tests nonetheless if defined with other classes.
class CallbackRetry(urllib3.util.Retry):
def __init__(self, *args, **kwargs):
self._newcb = kwargs.pop('new_callback')
super(CallbackRetry, self).__init__(*args, **kwargs)
def new(self, **kwargs):
if self._newcb is not None:
self._newcb(self)
kwargs['new_callback'] = self._newcb
return super(CallbackRetry, self).new(**kwargs)
# We don't want the Retry class raising its own exceptions because that will prevent
# hvac from raising its own on various response codes.
# We set this here, rather than in the defaults, because if the caller sets their own
# dict for retries, we use it directly, but we don't want them to have to remember to always
# set raise_on_status=False themselves to get proper error handling.
# On the off chance someone does set it, we leave it alone, even though it's probably a mistake.
# That will be mentioned in the parameter docs.
if 'raise_on_status' not in retry_kwargs:
retry_kwargs['raise_on_status'] = False
# needs urllib 1.15+ https://github.com/urllib3/urllib3/blob/main/CHANGES.rst#115-2016-04-06
# but we should always have newer ones via requests, via hvac
retry = CallbackRetry(**retry_kwargs)
adapter = HTTPAdapter(max_retries=retry)
sess = Session()
sess.mount("https://", adapter)
sess.mount("http://", adapter)
return sess
def _process_option_retries(self):
'''check if retries option is int or dict and interpret it appropriately'''
# this method focuses on validating the option, and setting a valid Retry object construction dict
# it intentionally does not build the Session object, which will be done elsewhere
retries_opt = self._options.get_option('retries')
if retries_opt is None:
return
# we'll start with a copy of our defaults
retries = self._RETRIES_DEFAULT_PARAMS.copy()
try:
# try int
# on int, retry the specified number of times, and use the defaults for everything else
# on zero, disable retries
retries_int = check_type_int(retries_opt)
if retries_int < 0:
raise ValueError("Number of retries must be >= 0 (got %i)" % retries_int)
elif retries_int == 0:
retries = None
else:
retries['total'] = retries_int
except TypeError:
try:
# try dict
# on dict, use the value directly (will be used as the kwargs to initialize the Retry instance)
retries = check_type_dict(retries_opt)
except TypeError:
raise TypeError("retries option must be interpretable as int or dict. Got: %r" % retries_opt)
self._options.set_option('retries', retries)
def _process_option_proxies(self):
'''check if 'proxies' option is dict or str and set it appropriately'''
proxies_opt = self._options.get_option('proxies')
if proxies_opt is None:
return
try:
# if it can be interpreted as dict
# do it
proxies = check_type_dict(proxies_opt)
except TypeError:
# if it can't be interpreted as dict
proxy = check_type_str(proxies_opt)
# but can be interpreted as str
# use this str as http and https proxy
proxies = {
'http': proxy,
'https': proxy,
}
# record the new/interpreted value for 'proxies' option
self._options.set_option('proxies', proxies)
def _boolean_or_cacert(self):
# This is needed because of this (https://hvac.readthedocs.io/en/stable/source/hvac_v1.html):
#
# # verify (Union[bool,str]) - Either a boolean to indicate whether TLS verification should
# # be performed when sending requests to Vault, or a string pointing at the CA bundle to use for verification.
#
'''return a bool or cacert'''
ca_cert = self._options.get_option('ca_cert')
validate_certs = self._options.get_option('validate_certs')
if validate_certs is None:
# Validate certs option was not explicitly set
# Check if VAULT_SKIP_VERIFY is set
vault_skip_verify = os.environ.get('VAULT_SKIP_VERIFY')
if vault_skip_verify is not None:
# VAULT_SKIP_VERIFY is set
try:
# Check that we have a boolean value
vault_skip_verify = check_type_bool(vault_skip_verify)
except TypeError:
# Not a boolean value fallback to default value (True)
validate_certs = True
else:
# Use the inverse of VAULT_SKIP_VERIFY
validate_certs = not vault_skip_verify
else:
validate_certs = True
if not (validate_certs and ca_cert):
self._options.set_option('ca_cert', validate_certs)
else:
self._options.set_option('ca_cert', to_text(ca_cert, errors='surrogate_or_strict'))