Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / coreapi   python

Repository URL to install this package:

Version: 1.32.0 

/ codecs / corejson.py

from __future__ import unicode_literals
from collections import OrderedDict
from coreapi.codecs.base import BaseCodec
from coreapi.compat import force_bytes, string_types, urlparse
from coreapi.compat import COMPACT_SEPARATORS, VERBOSE_SEPARATORS
from coreapi.document import Document, Link, Array, Object, Error, Field
from coreapi.exceptions import ParseError
import json


# Robust dictionary lookups, that always return an item of the correct
# type, using an empty default if an incorrect type exists.
# Useful for liberal parsing of inputs.

def _get_string(item, key):
    value = item.get(key)
    if isinstance(value, string_types):
        return value
    return ''


def _get_dict(item, key):
    value = item.get(key)
    if isinstance(value, dict):
        return value
    return {}


def _get_list(item, key):
    value = item.get(key)
    if isinstance(value, list):
        return value
    return []


def _get_bool(item, key):
    value = item.get(key)
    if isinstance(value, bool):
        return value
    return False


def _graceful_relative_url(base_url, url):
    """
    Return a graceful link for a URL relative to a base URL.

    * If they are the same, return an empty string.
    * If the have the same scheme and hostname, return the path & query params.
    * Otherwise return the full URL.
    """
    if url == base_url:
        return ''
    base_prefix = '%s://%s' % urlparse.urlparse(base_url or '')[0:2]
    url_prefix = '%s://%s' % urlparse.urlparse(url or '')[0:2]
    if base_prefix == url_prefix and url_prefix != '://':
        return url[len(url_prefix):]
    return url


def _escape_key(string):
    """
    The '_type' and '_meta' keys are reserved.
    Prefix with an additional '_' if they occur.
    """
    if string.startswith('_') and string.lstrip('_') in ('type', 'meta'):
        return '_' + string
    return string


def _unescape_key(string):
    """
    Unescape '__type' and '__meta' keys if they occur.
    """
    if string.startswith('__') and string.lstrip('_') in ('type', 'meta'):
        return string[1:]
    return string


def _get_content(item, base_url=None):
    """
    Return a dictionary of content, for documents, objects and errors.
    """
    return {
        _unescape_key(key): _primative_to_document(value, base_url)
        for key, value in item.items()
        if key not in ('_type', '_meta')
    }


def _document_to_primative(node, base_url=None):
    """
    Take a Core API document and return Python primatives
    ready to be rendered into the JSON style encoding.
    """
    if isinstance(node, Document):
        ret = OrderedDict()
        ret['_type'] = 'document'

        meta = OrderedDict()
        url = _graceful_relative_url(base_url, node.url)
        if url:
            meta['url'] = url
        if node.title:
            meta['title'] = node.title
        if meta:
            ret['_meta'] = meta

        # Fill in key-value content.
        ret.update([
            (_escape_key(key), _document_to_primative(value, base_url=url))
            for key, value in node.items()
        ])
        return ret

    elif isinstance(node, Error):
        ret = OrderedDict()
        ret['_type'] = 'error'

        if node.title:
            ret['_meta'] = {'title': node.title}

        # Fill in key-value content.
        ret.update([
            (_escape_key(key), _document_to_primative(value, base_url=base_url))
            for key, value in node.items()
        ])
        return ret

    elif isinstance(node, Link):
        ret = OrderedDict()
        ret['_type'] = 'link'
        url = _graceful_relative_url(base_url, node.url)
        if url:
            ret['url'] = url
        if node.action:
            ret['action'] = node.action
        if node.encoding:
            ret['encoding'] = node.encoding
        if node.transform:
            ret['transform'] = node.transform
        if node.description:
            ret['description'] = node.description
        if node.fields:
            ret['fields'] = [
                _document_to_primative(field) for field in node.fields
            ]
        return ret

    elif isinstance(node, Field):
        ret = OrderedDict({'name': node.name})
        if node.required:
            ret['required'] = node.required
        if node.location:
            ret['location'] = node.location
        if node.description:
            ret['description'] = node.description
        return ret

    elif isinstance(node, Object):
        return OrderedDict([
            (_escape_key(key), _document_to_primative(value, base_url=base_url))
            for key, value in node.items()
        ])

    elif isinstance(node, Array):
        return [_document_to_primative(value) for value in node]

    return node


def _primative_to_document(data, base_url=None):
    """
    Take Python primatives as returned from parsing JSON content,
    and return a Core API document.
    """
    if isinstance(data, dict) and data.get('_type') == 'document':
        # Document
        meta = _get_dict(data, '_meta')
        url = _get_string(meta, 'url')
        url = urlparse.urljoin(base_url, url)
        title = _get_string(meta, 'title')
        content = _get_content(data, base_url=url)
        return Document(url=url, title=title, content=content)

    if isinstance(data, dict) and data.get('_type') == 'error':
        # Error
        meta = _get_dict(data, '_meta')
        title = _get_string(meta, 'title')
        content = _get_content(data, base_url=base_url)
        return Error(title=title, content=content)

    elif isinstance(data, dict) and data.get('_type') == 'link':
        # Link
        url = _get_string(data, 'url')
        url = urlparse.urljoin(base_url, url)
        action = _get_string(data, 'action')
        encoding = _get_string(data, 'encoding')
        transform = _get_string(data, 'transform')
        description = _get_string(data, 'description')
        fields = _get_list(data, 'fields')
        fields = [
            Field(
                name=_get_string(item, 'name'),
                required=_get_bool(item, 'required'),
                location=_get_string(item, 'location'),
                description=_get_string(item, 'description')
            )
            for item in fields if isinstance(item, dict)
        ]
        return Link(
            url=url, action=action, encoding=encoding, transform=transform,
            description=description, fields=fields
        )

    elif isinstance(data, dict):
        # Map
        content = _get_content(data, base_url=base_url)
        return Object(content)

    elif isinstance(data, list):
        # Array
        content = [_primative_to_document(item, base_url) for item in data]
        return Array(content)

    # String, Integer, Number, Boolean, null.
    return data


class CoreJSONCodec(BaseCodec):
    media_type = 'application/vnd.coreapi+json'
    supports = ['encoding', 'decoding']

    def load(self, bytes, base_url=None):
        """
        Takes a bytestring and returns a document.
        """
        try:
            data = json.loads(bytes.decode('utf-8'))
        except ValueError as exc:
            raise ParseError('Malformed JSON. %s' % exc)

        doc = _primative_to_document(data, base_url)

        if isinstance(doc, Object):
            doc = Document(content=dict(doc))
        elif not (isinstance(doc, Document) or isinstance(doc, Error)):
            raise ParseError('Top level node should be a document or error.')

        return doc

    def dump(self, document, indent=False, **kwargs):
        """
        Takes a document and returns a bytestring.
        """
        if indent:
            options = {
                'ensure_ascii': False,
                'indent': 4,
                'separators': VERBOSE_SEPARATORS
            }
        else:
            options = {
                'ensure_ascii': False,
                'indent': None,
                'separators': COMPACT_SEPARATORS
            }

        data = _document_to_primative(document)
        return force_bytes(json.dumps(data, **options))