Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ansible / community / okd / plugins / module_utils / openshift_adm_prune_images.py
Size: Mime:
#!/usr/bin/env python

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

from datetime import datetime, timezone, timedelta
import traceback
import copy

from ansible.module_utils._text import to_native
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.module_utils.six import iteritems

try:
    from ansible_collections.kubernetes.core.plugins.module_utils.common import (
        K8sAnsibleMixin,
        get_api_client,
    )
    HAS_KUBERNETES_COLLECTION = True
except ImportError as e:
    HAS_KUBERNETES_COLLECTION = False
    k8s_collection_import_exception = e
    K8S_COLLECTION_ERROR = traceback.format_exc()

from ansible_collections.community.okd.plugins.module_utils.openshift_images_common import (
    OpenShiftAnalyzeImageStream,
    get_image_blobs,
    is_too_young_object,
    is_created_after,
)
from ansible_collections.community.okd.plugins.module_utils.openshift_docker_image import (
    parse_docker_image_ref,
    convert_storage_to_bytes,
)

try:
    from kubernetes import client
    from kubernetes.client import rest
    from kubernetes.dynamic.exceptions import (
        DynamicApiError,
        NotFoundError,
        ApiException
    )
except ImportError:
    pass


ApiConfiguration = {
    "LimitRange": "v1",
    "Pod": "v1",
    "ReplicationController": "v1",
    "DaemonSet": "apps/v1",
    "Deployment": "apps/v1",
    "ReplicaSet": "apps/v1",
    "StatefulSet": "apps/v1",
    "Job": "batch/v1",
    "CronJob": "batch/v1beta1",
    "DeploymentConfig": "apps.openshift.io/v1",
    "BuildConfig": "build.openshift.io/v1",
    "Build": "build.openshift.io/v1",
    "Image": "image.openshift.io/v1",
    "ImageStream": "image.openshift.io/v1",
}


def read_object_annotation(obj, name):
    return obj["metadata"]["annotations"].get(name)


def determine_host_registry(module, images, image_streams):
    # filter managed images
    def _f_managed_images(obj):
        value = read_object_annotation(obj, "openshift.io/image.managed")
        return boolean(value) if value is not None else False

    managed_images = list(filter(_f_managed_images, images))

    # Be sure to pick up the newest managed image which should have an up to date information
    sorted_images = sorted(managed_images,
                           key=lambda x: x["metadata"]["creationTimestamp"],
                           reverse=True)
    docker_image_ref = ""
    if len(sorted_images) > 0:
        docker_image_ref = sorted_images[0].get("dockerImageReference", "")
    else:
        # 2nd try to get the pull spec from any image stream
        # Sorting by creation timestamp may not get us up to date info. Modification time would be much
        sorted_image_streams = sorted(image_streams,
                                      key=lambda x: x["metadata"]["creationTimestamp"],
                                      reverse=True)
        for i_stream in sorted_image_streams:
            docker_image_ref = i_stream["status"].get("dockerImageRepository", "")
            if len(docker_image_ref) > 0:
                break

    if len(docker_image_ref) == 0:
        module.exit_json(changed=False, result="no managed image found")

    result, error = parse_docker_image_ref(docker_image_ref, module)
    return result['hostname']


class OpenShiftAdmPruneImages(K8sAnsibleMixin):
    def __init__(self, module):
        self.module = module
        self.fail_json = self.module.fail_json
        self.exit_json = self.module.exit_json

        if not HAS_KUBERNETES_COLLECTION:
            self.fail_json(
                msg="The kubernetes.core collection must be installed",
                exception=K8S_COLLECTION_ERROR,
                error=to_native(k8s_collection_import_exception),
            )

        super(OpenShiftAdmPruneImages, self).__init__(self.module)

        self.params = self.module.params
        self.check_mode = self.module.check_mode
        try:
            self.client = get_api_client(self.module)
        except DynamicApiError as e:
            self.fail_json(
                msg="Failed to get kubernetes client.",
                reason=e.reason,
                status=e.status,
            )
        except Exception as e:
            self.fail_json(
                msg="Failed to get kubernetes client.",
                error=to_native(e)
            )

        self.max_creation_timestamp = self.get_max_creation_timestamp()
        self._rest_client = None
        self.registryhost = self.params.get('registry_url')
        self.changed = False

    def list_objects(self):
        result = {}
        for kind, version in iteritems(ApiConfiguration):
            namespace = None
            if self.params.get("namespace") and kind.lower() == "imagestream":
                namespace = self.params.get("namespace")
            try:
                result[kind] = self.kubernetes_facts(kind=kind,
                                                     api_version=version,
                                                     namespace=namespace).get('resources')
            except DynamicApiError as e:
                self.fail_json(
                    msg="An error occurred while trying to list objects.",
                    reason=e.reason,
                    status=e.status,
                )
            except Exception as e:
                self.fail_json(
                    msg="An error occurred while trying to list objects.",
                    error=to_native(e)
                )
        return result

    def get_max_creation_timestamp(self):
        result = None
        if self.params.get("keep_younger_than"):
            dt_now = datetime.now(timezone.utc).replace(tzinfo=None)
            result = dt_now - timedelta(minutes=self.params.get("keep_younger_than"))
        return result

    @property
    def rest_client(self):
        if not self._rest_client:
            configuration = copy.deepcopy(self.client.configuration)
            validate_certs = self.params.get('registry_validate_certs')
            ssl_ca_cert = self.params.get('registry_ca_cert')
            if validate_certs is not None:
                configuration.verify_ssl = validate_certs
            if ssl_ca_cert is not None:
                configuration.ssl_ca_cert = ssl_ca_cert
            self._rest_client = rest.RESTClientObject(configuration)

        return self._rest_client

    def delete_from_registry(self, url):
        try:
            response = self.rest_client.DELETE(url=url, headers=self.client.configuration.api_key)
            if response.status == 404:
                # Unable to delete layer
                return None
            # non-2xx/3xx response doesn't cause an error
            if response.status < 200 or response.status >= 400:
                return None
            if response.status != 202 and response.status != 204:
                self.fail_json(
                    msg="Delete URL {0}: Unexpected status code in response: {1}".format(
                        response.status, url),
                    reason=response.reason
                )
            return None
        except ApiException as e:
            if e.status != 404:
                self.fail_json(
                    msg="Failed to delete URL: %s" % url,
                    reason=e.reason,
                    status=e.status,
                )
        except Exception as e:
            self.fail_json(msg="Delete URL {0}: {1}".format(url, type(e)))

    def delete_layers_links(self, path, layers):
        for layer in layers:
            url = "%s/v2/%s/blobs/%s" % (self.registryhost, path, layer)
            self.changed = True
            if not self.check_mode:
                self.delete_from_registry(url=url)

    def delete_manifests(self, path, digests):
        for digest in digests:
            url = "%s/v2/%s/manifests/%s" % (self.registryhost, path, digest)
            self.changed = True
            if not self.check_mode:
                self.delete_from_registry(url=url)

    def delete_blobs(self, blobs):
        for blob in blobs:
            self.changed = True
            url = "%s/admin/blobs/%s" % (self.registryhost, blob)
            if not self.check_mode:
                self.delete_from_registry(url=url)

    def update_image_stream_status(self, definition):
        kind = definition["kind"]
        api_version = definition["apiVersion"]
        namespace = definition["metadata"]["namespace"]
        name = definition["metadata"]["name"]

        self.changed = True
        result = definition
        if not self.check_mode:
            try:
                result = self.client.request(
                    "PUT",
                    "/apis/{api_version}/namespaces/{namespace}/imagestreams/{name}/status".format(
                        api_version=api_version,
                        namespace=namespace,
                        name=name
                    ),
                    body=definition,
                    content_type="application/json",
                ).to_dict()
            except DynamicApiError as exc:
                msg = "Failed to patch object: kind={0} {1}/{2}".format(
                    kind, namespace, name
                )
                self.fail_json(msg=msg, status=exc.status, reason=exc.reason)
            except Exception as exc:
                msg = "Failed to patch object kind={0} {1}/{2} due to: {3}".format(
                    kind, namespace, name, exc
                )
                self.fail_json(msg=msg, error=to_native(exc))
        return result

    def delete_image(self, image):
        kind = "Image"
        api_version = "image.openshift.io/v1"
        resource = self.find_resource(kind=kind, api_version=api_version)
        name = image["metadata"]["name"]
        self.changed = True
        if not self.check_mode:
            try:
                delete_options = client.V1DeleteOptions(grace_period_seconds=0)
                return resource.delete(name=name, body=delete_options).to_dict()
            except NotFoundError:
                pass
            except DynamicApiError as exc:
                self.fail_json(
                    msg="Failed to delete object %s/%s due to: %s" % (
                        kind, name, exc.body
                    ),
                    reason=exc.reason,
                    status=exc.status
                )
        else:
            existing = resource.get(name=name)
            if existing:
                existing = existing.to_dict()
            return existing

    def exceeds_limits(self, namespace, image):
        if namespace not in self.limit_range:
            return False
        docker_image_metadata = image.get("dockerImageMetadata")
        if not docker_image_metadata:
            return False
        docker_image_size = docker_image_metadata["Size"]

        for limit in self.limit_range.get(namespace):
            for item in limit["spec"]["limits"]:
                if item["type"] != "openshift.io/Image":
                    continue
                limit_max = item["max"]
                if not limit_max:
                    continue
                storage = limit_max["storage"]
                if not storage:
                    continue
                if convert_storage_to_bytes(storage) < docker_image_size:
                    # image size is larger than the permitted limit range max size
                    return True
        return False

    def prune_image_stream_tag(self, stream, tag_event_list):
        manifests_to_delete, images_to_delete = [], []
        filtered_items = []
        tag_event_items = tag_event_list["items"] or []
        prune_over_size_limit = self.params.get("prune_over_size_limit")
        stream_namespace = stream["metadata"]["namespace"]
        stream_name = stream["metadata"]["name"]
        for idx, item in enumerate(tag_event_items):
            if is_created_after(item["created"], self.max_creation_timestamp):
                filtered_items.append(item)
                continue

            if idx == 0:
                istag = "%s/%s:%s" % (stream_namespace,
                                      stream_name,
                                      tag_event_list["tag"])
                if istag in self.used_tags:
                    # keeping because tag is used
                    filtered_items.append(item)
                    continue

            if item["image"] not in self.image_mapping:
                # There are few options why the image may not be found:
                # 1. the image is deleted manually and this record is no longer valid
                # 2. the imagestream was observed before the image creation, i.e.
                #    this record was created recently and it should be protected by keep_younger_than
                continue

            image = self.image_mapping[item["image"]]
            # check prune over limit size
            if prune_over_size_limit and not self.exceeds_limits(stream_namespace, image):
                filtered_items.append(item)
                continue

            image_ref = "%s/%s@%s" % (stream_namespace,
                                      stream_name,
                                      item["image"])
            if image_ref in self.used_images:
                # keeping because tag is used
                filtered_items.append(item)
                continue

            images_to_delete.append(item["image"])
            if self.params.get('prune_registry'):
                manifests_to_delete.append(image["metadata"]["name"])
                path = stream_namespace + "/" + stream_name
                image_blobs, err = get_image_blobs(image)
                if not err:
                    self.delete_layers_links(path, image_blobs)

        return filtered_items, manifests_to_delete, images_to_delete

    def prune_image_streams(self, stream):
        name = stream['metadata']['namespace'] + "/" + stream['metadata']['name']
        if is_too_young_object(stream, self.max_creation_timestamp):
            # keeping all images because of image stream too young
            return None, []
        facts = self.kubernetes_facts(kind="ImageStream",
                                      api_version=ApiConfiguration.get("ImageStream"),
                                      name=stream["metadata"]["name"],
                                      namespace=stream["metadata"]["namespace"])
        image_stream = facts.get('resources')
        if len(image_stream) != 1:
            # skipping because it does not exist anymore
            return None, []
        stream = image_stream[0]
        namespace = self.params.get("namespace")
        stream_to_update = not namespace or (stream["metadata"]["namespace"] == namespace)

        manifests_to_delete, images_to_delete = [], []
        deleted_items = False

        # Update Image stream tag
        if stream_to_update:
            tags = stream["status"].get("tags", [])
            for idx, tag_event_list in enumerate(tags):
                (
                    filtered_tag_event,
                    tag_manifests_to_delete,
                    tag_images_to_delete
                ) = self.prune_image_stream_tag(stream, tag_event_list)
                stream['status']['tags'][idx]['items'] = filtered_tag_event
                manifests_to_delete += tag_manifests_to_delete
                images_to_delete += tag_images_to_delete
                deleted_items = deleted_items or (len(tag_images_to_delete) > 0)

        # Deleting tags without items
        tags = []
        for tag in stream["status"].get("tags", []):
            if tag['items'] is None or len(tag['items']) == 0:
                continue
            tags.append(tag)

        stream['status']['tags'] = tags
        result = None
        # Update ImageStream
        if stream_to_update:
            if deleted_items:
                result = self.update_image_stream_status(stream)

            if self.params.get("prune_registry"):
                self.delete_manifests(name, manifests_to_delete)

        return result, images_to_delete

    def prune_images(self, image):
        if not self.params.get("all_images"):
            if read_object_annotation(image, "openshift.io/image.managed") != "true":
                # keeping external image because all_images is set to false
                # pruning only managed images
                return None

        if is_too_young_object(image, self.max_creation_timestamp):
            # keeping because of keep_younger_than
            return None

        # Deleting image from registry
        if self.params.get("prune_registry"):
            image_blobs, err = get_image_blobs(image)
            if err:
                self.fail_json(msg=err)
            # add blob for image name
            image_blobs.append(image["metadata"]["name"])
            self.delete_blobs(image_blobs)

        # Delete image from cluster
        return self.delete_image(image)

    def execute_module(self):
        resources = self.list_objects()
        if not self.check_mode and self.params.get('prune_registry'):
            if not self.registryhost:
                self.registryhost = determine_host_registry(self.module, resources['Image'], resources['ImageStream'])
            # validate that host has a scheme
            if "://" not in self.registryhost:
                self.registryhost = "https://" + self.registryhost
        # Analyze Image Streams
        analyze_ref = OpenShiftAnalyzeImageStream(
            ignore_invalid_refs=self.params.get('ignore_invalid_refs'),
            max_creation_timestamp=self.max_creation_timestamp,
            module=self.module
        )
        self.used_tags, self.used_images, error = analyze_ref.analyze_image_stream(resources)
        if error:
            self.fail_json(msg=error)

        # Create image mapping
        self.image_mapping = {}
        for m in resources["Image"]:
            self.image_mapping[m["metadata"]["name"]] = m

        # Create limit range mapping
        self.limit_range = {}
        for limit in resources["LimitRange"]:
            namespace = limit["metadata"]["namespace"]
            if namespace not in self.limit_range:
                self.limit_range[namespace] = []
            self.limit_range[namespace].append(limit)

        # Stage 1: delete history from image streams
        updated_image_streams = []
        deleted_tags_images = []
        updated_is_mapping = {}
        for stream in resources['ImageStream']:
            result, images_to_delete = self.prune_image_streams(stream)
            if result:
                updated_is_mapping[result["metadata"]["namespace"] + "/" + result["metadata"]["name"]] = result
                updated_image_streams.append(result)
            deleted_tags_images += images_to_delete

        # Create a list with images referenced on image stream
        self.referenced_images = []
        for item in self.kubernetes_facts(kind="ImageStream", api_version="image.openshift.io/v1")["resources"]:
            name = "%s/%s" % (item["metadata"]["namespace"], item["metadata"]["name"])
            if name in updated_is_mapping:
                item = updated_is_mapping[name]
            for tag in item["status"].get("tags", []):
                self.referenced_images += [t["image"] for t in tag["items"] or []]

        # Stage 2: delete images
        images = []
        images_to_delete = [x["metadata"]["name"] for x in resources['Image']]
        if self.params.get("namespace") is not None:
            # When namespace is defined, prune only images that were referenced by ImageStream
            # from the corresponding namespace
            images_to_delete = deleted_tags_images
        for name in images_to_delete:
            if name in self.referenced_images:
                # The image is referenced in one or more Image stream
                continue
            if name not in self.image_mapping:
                # The image is not existing anymore
                continue
            result = self.prune_images(self.image_mapping[name])
            if result:
                images.append(result)

        result = {
            "changed": self.changed,
            "deleted_images": images,
            "updated_image_streams": updated_image_streams,
        }
        self.exit_json(**result)