Repository URL to install this package:
Version:
4.0.109 ▾
|
# Copyright (c) 2015 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
from oslo_utils.timeutils import parse_isotime
from barbicanclient import base
from barbicanclient import formatter
LOG = logging.getLogger(__name__)
def lazy(func):
@functools.wraps(func)
def wrapper(self, *args):
self._fill_lazy_properties()
return func(self, *args)
return wrapper
class CAFormatter(formatter.EntityFormatter):
columns = ("CA href",
"Name",
"Description",
"Created",
"Updated",
"Status",
"Plugin Name",
"Plugin CA ID",
"Expiration"
)
def _get_formatted_data(self):
created = self.created.isoformat() if self.created else None
updated = self.updated.isoformat() if self.updated else None
expiration = self.expiration.isoformat() if self.expiration else None
data = (self.ca_ref,
self.name,
self.description,
created,
updated,
self.status,
self.plugin_name,
self.plugin_ca_id,
expiration
)
return data
class CA(CAFormatter):
"""Certificate authority
CAs represent certificate authorities or subCAs with which the Barbican
service is configured to interact.
"""
_entity = 'cas'
def __init__(self, api, meta=None, expiration=None,
plugin_name=None, plugin_ca_id=None,
ca_ref=None, created=None, updated=None,
status=None, creator_id=None):
"""Certificate authority
CA objects should not be instantiated directly. You should use
the `create` or `get` methods of the
:class:`barbicanclient.cas.CAManager` instead.
"""
self._api = api
self._ca_ref = ca_ref
self._fill_from_data(
meta=meta,
expiration=expiration,
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
created=created,
updated=updated,
status=status,
creator_id=creator_id
)
@property
def ca_ref(self):
return self._ca_ref
@property
@lazy
def name(self):
return self._name
@property
@lazy
def expiration(self):
return self._expiration
@property
@lazy
def description(self):
return self._description
@property
@lazy
def plugin_name(self):
return self._plugin_name
@property
@lazy
def plugin_ca_id(self):
return self._plugin_ca_id
@property
@lazy
def created(self):
return self._created
@property
@lazy
def updated(self):
return self._updated
@property
@lazy
def status(self):
return self._status
def _fill_from_data(self, meta=None, expiration=None,
plugin_name=None, plugin_ca_id=None, created=None,
updated=None, status=None, creator_id=None):
self._name = None
self._description = None
if meta:
for s in meta:
key = list(s.keys())[0]
value = list(s.values())[0]
if key == 'name':
self._name = value
if key == 'description':
self._description = value
self._plugin_name = plugin_name
self._plugin_ca_id = plugin_ca_id
self._expiration = expiration
self._creator_id = creator_id
if self._expiration:
self._expiration = parse_isotime(self._expiration)
if self._ca_ref:
self._status = status
self._created = created
self._updated = updated
if self._created:
self._created = parse_isotime(self._created)
if self._updated:
self._updated = parse_isotime(self._updated)
else:
self._status = None
self._created = None
self._updated = None
def _fill_lazy_properties(self):
if self._ca_ref and not self._plugin_name:
result = self._api.get(self._ca_ref)
self._fill_from_data(
meta=result.get('meta'),
expiration=result.get('expiration'),
plugin_name=result.get('plugin_name'),
plugin_ca_id=result.get('plugin_ca_id'),
created=result.get('created'),
updated=result.get('updated'),
status=result.get('status')
)
def __repr__(self):
if self._ca_ref:
return 'CA(ca_ref="{0}")'.format(self._ca_ref)
return 'CA(name="{0}")'.format(self._name)
class CAManager(base.BaseEntityManager):
"""Entity Manager for Secret entities"""
def __init__(self, api):
super(CAManager, self).__init__(api, 'cas')
def get(self, ca_ref):
"""Retrieve an existing CA from Barbican
:param str ca_ref: Full HATEOAS reference to a CA
:returns: CA object retrieved from Barbican
:rtype: :class:`barbicanclient.cas.CA`
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
LOG.debug("Getting ca - CA href: {0}".format(ca_ref))
base.validate_ref(ca_ref, 'CA')
return CA(
api=self._api,
ca_ref=ca_ref
)
def list(self, limit=10, offset=0, name=None):
"""List CAs for the project
This method uses the limit and offset parameters for paging,
and also supports filtering.
:param limit: Max number of CAs returned
:param offset: Offset secrets to begin list
:param name: Name filter for the list
:returns: list of CA objects that satisfy the provided filter
criteria.
:rtype: list
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
LOG.debug('Listing CAs - offset {0} limit {1}'.format(offset, limit))
params = {'limit': limit, 'offset': offset}
if name:
params['name'] = name
response = self._api.get(self._entity, params=params)
return [
CA(api=self._api, ca_ref=s)
for s in response.get('cas', [])
]