Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ansible / ibm / qradar / plugins / module_utils / qradar.py
Size: Mime:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# (c) 2019, Adam Miller (admiller@redhat.com)
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function

__metaclass__ = type
from ansible.module_utils.urls import CertificateError
from ansible.module_utils.six.moves.urllib.parse import quote_plus
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.connection import Connection
from ansible.module_utils._text import to_text

import json

BASE_HEADERS = {"Content-Type": "application/json", "Version": "9.1"}


def find_dict_in_list(some_list, key, value):
    text_type = False
    try:
        to_text(value)
        text_type = True
    except TypeError:
        pass
    for some_dict in some_list:
        if key in some_dict:
            if text_type:
                if to_text(some_dict[key]).strip() == to_text(value).strip():
                    return some_dict, some_list.index(some_dict)
            else:
                if some_dict[key] == value:
                    return some_dict, some_list.index(some_dict)
    return None


def set_offense_values(module, qradar_request):
    if module.params["closing_reason"]:
        found_closing_reason = qradar_request.get_by_path(
            "api/siem/offense_closing_reasons?filter={0}".format(
                quote_plus(
                    'text="{0}"'.format(module.params["closing_reason"])
                )
            )
        )
        if found_closing_reason:
            module.params["closing_reason_id"] = found_closing_reason[0]["id"]
        else:
            module.fail_json(
                "Unable to find closing_reason text: {0}".format(
                    module.params["closing_reason"]
                )
            )

    if module.params["status"]:
        module.params["status"] = module.params["status"].upper()


class QRadarRequest(object):
    def __init__(self, module, headers=None, not_rest_data_keys=None):

        self.module = module
        self.connection = Connection(self.module._socket_path)

        # This allows us to exclude specific argspec keys from being included by
        # the rest data that don't follow the qradar_* naming convention
        if not_rest_data_keys:
            self.not_rest_data_keys = not_rest_data_keys
        else:
            self.not_rest_data_keys = []
        self.not_rest_data_keys.append("validate_certs")
        self.headers = headers if headers else BASE_HEADERS

    def _httpapi_error_handle(self, method, uri, payload=None):
        # FIXME - make use of handle_httperror(self, exception) where applicable
        #   https://docs.ansible.com/ansible/latest/network/dev_guide/developing_plugins_network.html#developing-plugins-httpapi

        try:
            code, response = self.connection.send_request(
                method, uri, payload=payload, headers=self.headers
            )
        except ConnectionError as e:
            self.module.fail_json(
                msg="connection error occurred: {0}".format(e)
            )
        except CertificateError as e:
            self.module.fail_json(
                msg="certificate error occurred: {0}".format(e)
            )
        except ValueError as e:
            self.module.fail_json(msg="certificate not found: {0}".format(e))

        if code == 404:
            if (
                to_text("Object not found") in to_text(response)
                or to_text("Could not find object") in to_text(response)
                or to_text("No offense was found") in to_text(response)
            ):
                return {}

        if code == 409:
            if "code" in response:
                if response["code"] in [1002, 1004]:
                    # https://www.ibm.com/support/knowledgecenter/SS42VS_7.3.1/com.ibm.qradar.doc/9.2--staged_config-deploy_status-POST.html
                    # Documentation says we should get 1002, but I'm getting 1004 from QRadar
                    return response
                else:
                    self.module.fail_json(
                        msg="qradar httpapi returned error {0} with message {1}".format(
                            code, response
                        )
                    )
        elif not (code >= 200 and code < 300):
            self.module.fail_json(
                msg="qradar httpapi returned error {0} with message {1}".format(
                    code, response
                )
            )

        return response

    def get(self, url, **kwargs):
        return self._httpapi_error_handle("GET", url, **kwargs)

    def put(self, url, **kwargs):
        return self._httpapi_error_handle("PUT", url, **kwargs)

    def post(self, url, **kwargs):
        return self._httpapi_error_handle("POST", url, **kwargs)

    def patch(self, url, **kwargs):
        return self._httpapi_error_handle("PATCH", url, **kwargs)

    def delete(self, url, **kwargs):
        return self._httpapi_error_handle("DELETE", url, **kwargs)

    def get_data(self):
        """
        Get the valid fields that should be passed to the REST API as urlencoded
        data so long as the argument specification to the module follows the
        convention:
            - the key to the argspec item does not start with qradar_
            - the key does not exist in the not_data_keys list
        """
        try:
            qradar_data = {}
            for param in self.module.params:
                if (self.module.params[param]) is not None and (
                    param not in self.not_rest_data_keys
                ):
                    qradar_data[param] = self.module.params[param]
            return qradar_data

        except TypeError as e:
            self.module.fail_json(
                msg="invalid data type provided: {0}".format(e)
            )

    def post_by_path(self, rest_path, data=None):
        """
        POST with data to path
        """
        if data is None:
            data = json.dumps(self.get_data())
        elif data is False:
            # Because for some reason some QRadar REST API endpoint use the
            # query string to modify state
            return self.post("/{0}".format(rest_path))
        return self.post("/{0}".format(rest_path), payload=data)

    def create_update(self, rest_path, data=None):
        """
        Create or Update a file/directory monitor data input in qradar
        """
        if data is None:
            data = json.dumps(self.get_data())
        # return self.post("/{0}".format(rest_path), payload=data)
        return self.patch("/{0}".format(rest_path), payload=data)  # PATCH