Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
botocore / tests / unit / response_parsing / test_response_parsing.py
Size: Mime:
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import difflib
import glob
import json
import logging
import os
import pprint

import pytest

import botocore.session
from botocore import parsers, xform_name
from tests import create_session

log = logging.getLogger(__name__)


SPECIAL_CASES = [
    'iam-get-user-policy.xml',  # Needs the JSON decode from handlers.py
    'iam-list-roles.xml',  # Needs the JSON decode from handlers.py for the policy
    's3-get-bucket-location.xml',  # Confirmed, this will need a special handler
    # 's3-list-multipart-uploads.xml',  # Bug in model, missing delimeter
    'cloudformation-get-template.xml',  # Need to JSON decode the template body.
]


def _get_expected_parsed_result(filename):
    dirname, filename = os.path.split(filename)
    basename = os.path.splitext(filename)[0]
    jsonfile = os.path.join(dirname, basename + '.json')
    with open(jsonfile) as f:
        return json.load(f)


def _get_raw_response_body(xmlfile):
    with open(xmlfile, 'rb') as f:
        return f.read()


def _get_operation_model(service_model, filename):
    dirname, filename = os.path.split(filename)
    basename = os.path.splitext(filename)[0]
    sn, opname = basename.split('-', 1)
    # In order to have multiple tests for the same
    # operation a '#' char is used to separate
    # operation names from some other suffix so that
    # the tests have a different filename, e.g
    # my-operation#1.xml, my-operation#2.xml.
    opname = opname.split('#')[0]
    operation_names = service_model.operation_names
    for operation_name in operation_names:
        if xform_name(operation_name) == opname.replace('-', '_'):
            return service_model.operation_model(operation_name)


def _test_parsed_response(xmlfile, operation_model, expected):
    response_body = _get_raw_response_body(xmlfile)
    response = {'body': response_body, 'status_code': 200, 'headers': {}}
    for case in SPECIAL_CASES:
        if case in xmlfile:
            print("SKIP: %s" % xmlfile)
            return
    if 'errors' in xmlfile:
        response['status_code'] = 400
    # Handle the special cased __headers__ key if it exists.
    if b'__headers__' in response_body:
        loaded = json.loads(response_body.decode('utf-8'))
        response['headers'] = loaded.pop('__headers__')
        response['body'] = json.dumps(loaded).encode('utf-8')

    protocol = operation_model.service_model.protocol
    parser_cls = parsers.PROTOCOL_PARSERS[protocol]
    parser = parser_cls(timestamp_parser=lambda x: x)
    parsed = parser.parse(response, operation_model.output_shape)
    parsed = _convert_bytes_to_str(parsed)
    expected['ResponseMetadata']['HTTPStatusCode'] = response['status_code']
    expected['ResponseMetadata']['HTTPHeaders'] = response['headers']

    d2 = parsed
    d1 = expected

    if d1 != d2:
        log.debug('-' * 40)
        log.debug("XML FILE:\n" + xmlfile)
        log.debug('-' * 40)
        log.debug("ACTUAL:\n" + pprint.pformat(parsed))
        log.debug('-' * 40)
        log.debug("EXPECTED:\n" + pprint.pformat(expected))
    if not d1 == d2:
        # Borrowed from assertDictEqual, though this doesn't
        # handle the case when unicode literals are used in one
        # dict but not in the other (and we want to consider them
        # as being equal).
        print(d1)
        print()
        print(d2)
        pretty_d1 = pprint.pformat(d1, width=1).splitlines()
        pretty_d2 = pprint.pformat(d2, width=1).splitlines()
        diff = '\n' + '\n'.join(difflib.ndiff(pretty_d1, pretty_d2))
        raise AssertionError("Dicts are not equal:\n%s" % diff)


def _convert_bytes_to_str(parsed):
    if isinstance(parsed, dict):
        new_dict = {}
        for key, value in parsed.items():
            new_dict[key] = _convert_bytes_to_str(value)
        return new_dict
    elif isinstance(parsed, bytes):
        return parsed.decode('utf-8')
    elif isinstance(parsed, list):
        new_list = []
        for item in parsed:
            new_list.append(_convert_bytes_to_str(item))
        return new_list
    else:
        return parsed


def _xml_test_cases():
    session = create_session()
    test_cases = []
    for dp in ['responses', 'errors']:
        data_path = os.path.join(os.path.dirname(__file__), 'xml')
        data_path = os.path.join(data_path, dp)
        xml_files = glob.glob('%s/*.xml' % data_path)
        service_names = set()
        for fn in xml_files:
            service_names.add(os.path.split(fn)[1].split('-')[0])
        for service_name in service_names:
            service_model = session.get_service_model(service_name)
            service_xml_files = glob.glob(f'{data_path}/{service_name}-*.xml')
            for xmlfile in service_xml_files:
                expected = _get_expected_parsed_result(xmlfile)
                operation_model = _get_operation_model(service_model, xmlfile)
                test_cases.append((xmlfile, operation_model, expected))
    return sorted(test_cases)


@pytest.mark.parametrize(
    "xmlfile, operation_model, expected", _xml_test_cases()
)
def test_xml_parsing(xmlfile, operation_model, expected):
    _test_parsed_response(xmlfile, operation_model, expected)


def _json_test_cases():
    # The outputs/ directory has sample output responses
    # For each file in outputs/ there's a corresponding file
    # in expected/ that has the expected parsed response.
    base_dir = os.path.join(os.path.dirname(__file__), 'json')
    json_responses_dir = os.path.join(base_dir, 'errors')
    expected_parsed_dir = os.path.join(base_dir, 'expected')
    session = botocore.session.get_session()
    json_test_cases = []
    for json_response_file in os.listdir(json_responses_dir):
        # Files look like: 'datapipeline-create-pipeline.json'
        service_name, operation_name = os.path.splitext(json_response_file)[
            0
        ].split('-', 1)
        expected_parsed_response = os.path.join(
            expected_parsed_dir, json_response_file
        )
        raw_response_file = os.path.join(
            json_responses_dir, json_response_file
        )
        with open(expected_parsed_response) as f:
            expected = json.load(f)
        service_model = session.get_service_model(service_name)
        operation_names = service_model.operation_names
        operation_model = None
        for op_name in operation_names:
            if xform_name(op_name) == operation_name.replace('-', '_'):
                operation_model = service_model.operation_model(op_name)
        json_test_cases.append((raw_response_file, operation_model, expected))
    return sorted(json_test_cases)


@pytest.mark.parametrize(
    "raw_response_file, operation_model, expected", _json_test_cases()
)
def test_json_errors_parsing(raw_response_file, operation_model, expected):
    _test_parsed_response(raw_response_file, operation_model, expected)