Repository URL to install this package:
|
Version:
6.0.0 ▾
|
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import traceback
from ansible.module_utils._text import to_native
try:
from ansible_collections.kubernetes.core.plugins.module_utils.common import (
K8sAnsibleMixin,
get_api_client,
)
HAS_KUBERNETES_COLLECTION = True
except ImportError as e:
HAS_KUBERNETES_COLLECTION = False
k8s_collection_import_exception = e
K8S_COLLECTION_ERROR = traceback.format_exc()
try:
from kubernetes import client
from kubernetes.dynamic.exceptions import DynamicApiError, NotFoundError
except ImportError:
pass
class OpenShiftAdmPruneAuth(K8sAnsibleMixin):
def __init__(self, module):
self.module = module
self.fail_json = self.module.fail_json
self.exit_json = self.module.exit_json
if not HAS_KUBERNETES_COLLECTION:
self.module.fail_json(
msg="The kubernetes.core collection must be installed",
exception=K8S_COLLECTION_ERROR,
error=to_native(k8s_collection_import_exception),
)
super(OpenShiftAdmPruneAuth, self).__init__(self.module)
self.params = self.module.params
self.check_mode = self.module.check_mode
self.client = get_api_client(self.module)
def prune_resource_binding(self, kind, api_version, ref_kind, ref_namespace_names, propagation_policy=None):
resource = self.find_resource(kind=kind, api_version=api_version, fail=True)
candidates = []
for ref_namespace, ref_name in ref_namespace_names:
try:
result = resource.get(name=None, namespace=ref_namespace)
result = result.to_dict()
result = result.get('items') if 'items' in result else [result]
for obj in result:
namespace = obj['metadata'].get('namespace', None)
name = obj['metadata'].get('name')
if ref_kind and obj['roleRef']['kind'] != ref_kind:
# skip this binding as the roleRef.kind does not match
continue
if obj['roleRef']['name'] == ref_name:
# select this binding as the roleRef.name match
candidates.append((namespace, name))
except NotFoundError:
continue
except DynamicApiError as exc:
msg = "Failed to get {kind} resource due to: {msg}".format(kind=kind, msg=exc.body)
self.fail_json(msg=msg)
except Exception as e:
msg = "Failed to get {kind} due to: {msg}".format(kind=kind, msg=to_native(e))
self.fail_json(msg=msg)
if len(candidates) == 0 or self.check_mode:
return [y if x is None else x + "/" + y for x, y in candidates]
delete_options = client.V1DeleteOptions()
if propagation_policy:
delete_options.propagation_policy = propagation_policy
for namespace, name in candidates:
try:
result = resource.delete(name=name, namespace=namespace, body=delete_options)
except DynamicApiError as exc:
msg = "Failed to delete {kind} {namespace}/{name} due to: {msg}".format(kind=kind, namespace=namespace, name=name, msg=exc.body)
self.fail_json(msg=msg)
except Exception as e:
msg = "Failed to delete {kind} {namespace}/{name} due to: {msg}".format(kind=kind, namespace=namespace, name=name, msg=to_native(e))
self.fail_json(msg=msg)
return [y if x is None else x + "/" + y for x, y in candidates]
def update_resource_binding(self, ref_kind, ref_names, namespaced=False):
kind = 'ClusterRoleBinding'
api_version = "rbac.authorization.k8s.io/v1",
if namespaced:
kind = "RoleBinding"
resource = self.find_resource(kind=kind, api_version=api_version, fail=True)
result = resource.get(name=None, namespace=None).to_dict()
result = result.get('items') if 'items' in result else [result]
if len(result) == 0:
return [], False
def _update_user_group(binding_namespace, subjects):
users, groups = [], []
for x in subjects:
if x['kind'] == 'User':
users.append(x['name'])
elif x['kind'] == 'Group':
groups.append(x['name'])
elif x['kind'] == 'ServiceAccount':
namespace = binding_namespace
if x.get('namespace') is not None:
namespace = x.get('namespace')
if namespace is not None:
users.append("system:serviceaccount:%s:%s" % (namespace, x['name']))
return users, groups
candidates = []
changed = False
for item in result:
subjects = item.get('subjects', [])
retainedSubjects = [x for x in subjects if x['kind'] == ref_kind and x['name'] in ref_names]
if len(subjects) != len(retainedSubjects):
updated_binding = item
updated_binding['subjects'] = retainedSubjects
binding_namespace = item['metadata'].get('namespace', None)
updated_binding['userNames'], updated_binding['groupNames'] = _update_user_group(binding_namespace, retainedSubjects)
candidates.append(binding_namespace + "/" + item['metadata']['name'] if binding_namespace else item['metadata']['name'])
changed = True
if not self.check_mode:
try:
resource.apply(updated_binding, namespace=binding_namespace)
except DynamicApiError as exc:
msg = "Failed to apply object due to: {0}".format(exc.body)
self.fail_json(msg=msg)
return candidates, changed
def update_security_context(self, ref_names, key):
params = {'kind': 'SecurityContextConstraints', 'api_version': 'security.openshift.io/v1'}
sccs = self.kubernetes_facts(**params)
if not sccs['api_found']:
self.fail_json(msg=sccs['msg'])
sccs = sccs.get('resources')
candidates = []
changed = False
resource = self.find_resource(kind="SecurityContextConstraints", api_version="security.openshift.io/v1")
for item in sccs:
subjects = item.get(key, [])
retainedSubjects = [x for x in subjects if x not in ref_names]
if len(subjects) != len(retainedSubjects):
candidates.append(item['metadata']['name'])
changed = True
if not self.check_mode:
upd_sec_ctx = item
upd_sec_ctx.update({key: retainedSubjects})
try:
resource.apply(upd_sec_ctx, namespace=None)
except DynamicApiError as exc:
msg = "Failed to apply object due to: {0}".format(exc.body)
self.fail_json(msg=msg)
return candidates, changed
def auth_prune_roles(self):
params = {'kind': 'Role', 'api_version': 'rbac.authorization.k8s.io/v1', 'namespace': self.params.get('namespace')}
for attr in ('name', 'label_selectors'):
if self.params.get(attr):
params[attr] = self.params.get(attr)
result = self.kubernetes_facts(**params)
if not result['api_found']:
self.fail_json(msg=result['msg'])
roles = result.get('resources')
if len(roles) == 0:
self.exit_json(changed=False, msg="No candidate rolebinding to prune from namespace %s." % self.params.get('namespace'))
ref_roles = [(x['metadata']['namespace'], x['metadata']['name']) for x in roles]
candidates = self.prune_resource_binding(kind="RoleBinding",
api_version="rbac.authorization.k8s.io/v1",
ref_kind="Role",
ref_namespace_names=ref_roles,
propagation_policy='Foreground')
if len(candidates) == 0:
self.exit_json(changed=False, role_binding=candidates)
self.exit_json(changed=True, role_binding=candidates)
def auth_prune_clusterroles(self):
params = {'kind': 'ClusterRole', 'api_version': 'rbac.authorization.k8s.io/v1'}
for attr in ('name', 'label_selectors'):
if self.params.get(attr):
params[attr] = self.params.get(attr)
result = self.kubernetes_facts(**params)
if not result['api_found']:
self.fail_json(msg=result['msg'])
clusterroles = result.get('resources')
if len(clusterroles) == 0:
self.exit_json(changed=False, msg="No clusterroles found matching input criteria.")
ref_clusterroles = [(None, x['metadata']['name']) for x in clusterroles]
# Prune ClusterRoleBinding
candidates_cluster_binding = self.prune_resource_binding(kind="ClusterRoleBinding",
api_version="rbac.authorization.k8s.io/v1",
ref_kind=None,
ref_namespace_names=ref_clusterroles)
# Prune Role Binding
candidates_namespaced_binding = self.prune_resource_binding(kind="RoleBinding",
api_version="rbac.authorization.k8s.io/v1",
ref_kind='ClusterRole',
ref_namespace_names=ref_clusterroles)
self.exit_json(changed=True,
cluster_role_binding=candidates_cluster_binding,
role_binding=candidates_namespaced_binding)
def list_groups(self, params=None):
options = {'kind': 'Group', 'api_version': 'user.openshift.io/v1'}
if params:
for attr in ('name', 'label_selectors'):
if params.get(attr):
options[attr] = params.get(attr)
return self.kubernetes_facts(**options)
def auth_prune_users(self):
params = {'kind': 'User', 'api_version': 'user.openshift.io/v1'}
for attr in ('name', 'label_selectors'):
if self.params.get(attr):
params[attr] = self.params.get(attr)
users = self.kubernetes_facts(**params)
if len(users) == 0:
self.exit_json(changed=False, msg="No resource type 'User' found matching input criteria.")
names = [x['metadata']['name'] for x in users]
changed = False
# Remove the user role binding
rolebinding, changed_role = self.update_resource_binding(ref_kind="User",
ref_names=names,
namespaced=True)
changed = changed or changed_role
# Remove the user cluster role binding
clusterrolesbinding, changed_cr = self.update_resource_binding(ref_kind="User",
ref_names=names)
changed = changed or changed_cr
# Remove the user from security context constraints
sccs, changed_sccs = self.update_security_context(names, 'users')
changed = changed or changed_sccs
# Remove the user from groups
groups = self.list_groups()
deleted_groups = []
resource = self.find_resource(kind="Group", api_version="user.openshift.io/v1")
for grp in groups:
subjects = grp.get('users', [])
retainedSubjects = [x for x in subjects if x not in names]
if len(subjects) != len(retainedSubjects):
deleted_groups.append(grp['metadata']['name'])
changed = True
if not self.check_mode:
upd_group = grp
upd_group.update({'users': retainedSubjects})
try:
resource.apply(upd_group, namespace=None)
except DynamicApiError as exc:
msg = "Failed to apply object due to: {0}".format(exc.body)
self.fail_json(msg=msg)
# Remove the user's OAuthClientAuthorizations
oauth = self.kubernetes_facts(kind='OAuthClientAuthorization', api_version='oauth.openshift.io/v1')
deleted_auths = []
resource = self.find_resource(kind="OAuthClientAuthorization", api_version="oauth.openshift.io/v1")
for authorization in oauth:
if authorization.get('userName', None) in names:
auth_name = authorization['metadata']['name']
deleted_auths.append(auth_name)
changed = True
if not self.check_mode:
try:
resource.delete(name=auth_name, namespace=None, body=client.V1DeleteOptions())
except DynamicApiError as exc:
msg = "Failed to delete OAuthClientAuthorization {name} due to: {msg}".format(name=auth_name, msg=exc.body)
self.fail_json(msg=msg)
except Exception as e:
msg = "Failed to delete OAuthClientAuthorization {name} due to: {msg}".format(name=auth_name, msg=to_native(e))
self.fail_json(msg=msg)
self.exit_json(changed=changed,
cluster_role_binding=clusterrolesbinding,
role_binding=rolebinding,
security_context_constraints=sccs,
authorization=deleted_auths,
group=deleted_groups)
def auth_prune_groups(self):
groups = self.list_groups(params=self.params)
if len(groups) == 0:
self.exit_json(changed=False, result="No resource type 'Group' found matching input criteria.")
names = [x['metadata']['name'] for x in groups]
changed = False
# Remove the groups role binding
rolebinding, changed_role = self.update_resource_binding(ref_kind="Group",
ref_names=names,
namespaced=True)
changed = changed or changed_role
# Remove the groups cluster role binding
clusterrolesbinding, changed_cr = self.update_resource_binding(ref_kind="Group",
ref_names=names)
changed = changed or changed_cr
# Remove the groups security context constraints
sccs, changed_sccs = self.update_security_context(names, 'groups')
changed = changed or changed_sccs
self.exit_json(changed=changed,
cluster_role_binding=clusterrolesbinding,
role_binding=rolebinding,
security_context_constraints=sccs)
def execute_module(self):
auth_prune = {
'roles': self.auth_prune_roles,
'clusterroles': self.auth_prune_clusterroles,
'users': self.auth_prune_users,
'groups': self.auth_prune_groups,
}
auth_prune[self.params.get('resource')]()