Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / ruamel.yaml   python

Repository URL to install this package:

Version: 0.15.80 

/ yaml / compat.py

# coding: utf-8

from __future__ import print_function

# partially from package six by Benjamin Peterson

import sys
import os
import types
import traceback
from abc import abstractmethod


# fmt: off
if False:  # MYPY
    from typing import Any, Dict, Optional, List, Union, BinaryIO, IO, Text, Tuple, Optional  # NOQA
# fmt: on

_DEFAULT_YAML_VERSION = (1, 2)

try:
    from ruamel.ordereddict import ordereddict
except:  # NOQA
    try:
        from collections import OrderedDict
    except ImportError:
        from ordereddict import OrderedDict  # type: ignore
    # to get the right name import ... as ordereddict doesn't do that

    class ordereddict(OrderedDict):  # type: ignore
        if not hasattr(OrderedDict, 'insert'):

            def insert(self, pos, key, value):
                # type: (int, Any, Any) -> None
                if pos >= len(self):
                    self[key] = value
                    return
                od = ordereddict()
                od.update(self)
                for k in od:
                    del self[k]
                for index, old_key in enumerate(od):
                    if pos == index:
                        self[key] = value
                    self[old_key] = od[old_key]


PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3


if PY3:

    def utf8(s):
        # type: (str) -> str
        return s

    def to_str(s):
        # type: (str) -> str
        return s

    def to_unicode(s):
        # type: (str) -> str
        return s


else:

    def utf8(s):
        # type: (unicode) -> str
        return s.encode('utf-8')

    def to_str(s):
        # type: (str) -> str
        return str(s)

    def to_unicode(s):
        # type: (str) -> unicode
        return unicode(s)  # NOQA


if PY3:
    string_types = str
    integer_types = int
    class_types = type
    text_type = str
    binary_type = bytes

    MAXSIZE = sys.maxsize
    unichr = chr
    import io

    StringIO = io.StringIO
    BytesIO = io.BytesIO
    # have unlimited precision
    no_limit_int = int
    from collections.abc import Hashable, MutableSequence, MutableMapping, Mapping  # NOQA

else:
    string_types = basestring  # NOQA
    integer_types = (int, long)  # NOQA
    class_types = (type, types.ClassType)
    text_type = unicode  # NOQA
    binary_type = str

    # to allow importing
    unichr = unichr  # type: ignore
    from StringIO import StringIO as _StringIO

    StringIO = _StringIO
    import cStringIO

    BytesIO = cStringIO.StringIO
    # have unlimited precision
    no_limit_int = long  # NOQA not available on Python 3
    from collections import Hashable, MutableSequence, MutableMapping, Mapping  # NOQA

if False:  # MYPY
    # StreamType = Union[BinaryIO, IO[str], IO[unicode],  StringIO]
    StreamType = Union[BinaryIO, IO[str], StringIO]

    StreamTextType = Union[Text, StreamType]
    VersionType = Union[List[int], str, Tuple[int, int]]

if PY3:
    builtins_module = 'builtins'
else:
    builtins_module = '__builtin__'

UNICODE_SIZE = 4 if sys.maxunicode > 65535 else 2


def with_metaclass(meta, *bases):
    # type: (Any, Any) -> Any
    """Create a base class with a metaclass."""
    return meta('NewBase', bases, {})


DBG_TOKEN = 1
DBG_EVENT = 2
DBG_NODE = 4


_debug = None  # type: Optional[int]
if 'RUAMELDEBUG' in os.environ:
    _debugx = os.environ.get('RUAMELDEBUG')
    if _debugx is None:
        _debug = 0
    else:
        _debug = int(_debugx)


if bool(_debug):

    class ObjectCounter(object):
        def __init__(self):
            # type: () -> None
            self.map = {}  # type: Dict[Any, Any]

        def __call__(self, k):
            # type: (Any) -> None
            self.map[k] = self.map.get(k, 0) + 1

        def dump(self):
            # type: () -> None
            for k in sorted(self.map):
                sys.stdout.write('{} -> {}'.format(k, self.map[k]))

    object_counter = ObjectCounter()


# used from yaml util when testing
def dbg(val=None):
    # type: (Any) -> Any
    global _debug
    if _debug is None:
        # set to true or false
        _debugx = os.environ.get('YAMLDEBUG')
        if _debugx is None:
            _debug = 0
        else:
            _debug = int(_debugx)
    if val is None:
        return _debug
    return _debug & val


class Nprint(object):
    def __init__(self, file_name=None):
        # type: (Any) -> None
        self._max_print = None  # type: Any
        self._count = None  # type: Any
        self._file_name = file_name

    def __call__(self, *args, **kw):
        # type: (Any, Any) -> None
        if not bool(_debug):
            return
        out = sys.stdout if self._file_name is None else open(self._file_name, 'a')
        dbgprint = print  # to fool checking for print statements by dv utility
        kw1 = kw.copy()
        kw1['file'] = out
        dbgprint(*args, **kw1)
        out.flush()
        if self._max_print is not None:
            if self._count is None:
                self._count = self._max_print
            self._count -= 1
            if self._count == 0:
                dbgprint('forced exit\n')
                traceback.print_stack()
                out.flush()
                sys.exit(0)
        if self._file_name:
            out.close()

    def set_max_print(self, i):
        # type: (int) -> None
        self._max_print = i
        self._count = None


nprint = Nprint()
nprintf = Nprint('/var/tmp/ruamel.yaml.log')

# char checkers following production rules


def check_namespace_char(ch):
    # type: (Any) -> bool
    if u'\x21' <= ch <= u'\x7E':  # ! to ~
        return True
    if u'\xA0' <= ch <= u'\xD7FF':
        return True
    if (u'\xE000' <= ch <= u'\xFFFD') and ch != u'\xFEFF':  # excl. byte order mark
        return True
    if u'\x10000' <= ch <= u'\x10FFFF':
        return True
    return False


def check_anchorname_char(ch):
    # type: (Any) -> bool
    if ch in u',[]{}':
        return False
    return check_namespace_char(ch)


def version_tnf(t1, t2=None):
    # type: (Any, Any) -> Any
    from ruamel.yaml import version_info  # NOQA

    if version_info < t1:
        return True
    if t2 is not None and version_info < t2:
        return None
    return False


class MutableSliceableSequence(MutableSequence):
    __slots__ = ()

    def __getitem__(self, index):
        # type: (Any) -> Any
        if not isinstance(index, slice):
            return self.__getsingleitem__(index)
        return type(self)([self[i] for i in range(*index.indices(len(self)))])  # type: ignore

    def __setitem__(self, index, value):
        # type: (Any, Any) -> None
        if not isinstance(index, slice):
            return self.__setsingleitem__(index, value)
        assert iter(value)
        # nprint(index.start, index.stop, index.step, index.indices(len(self)))
        if index.step is None:
            del self[index.start : index.stop]
            for elem in reversed(value):
                self.insert(0 if index.start is None else index.start, elem)
        else:
            range_parms = index.indices(len(self))
            nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[2] + 1
            # need to test before changing, in case TypeError is caught
            if nr_assigned_items < len(value):
                raise TypeError(
                    'too many elements in value {} < {}'.format(nr_assigned_items, len(value))
                )
            elif nr_assigned_items > len(value):
                raise TypeError(
                    'not enough elements in value {} > {}'.format(
                        nr_assigned_items, len(value)
                    )
                )
            for idx, i in enumerate(range(*range_parms)):
                self[i] = value[idx]

    def __delitem__(self, index):
        # type: (Any) -> None
        if not isinstance(index, slice):
            return self.__delsingleitem__(index)
        # nprint(index.start, index.stop, index.step, index.indices(len(self)))
        for i in reversed(range(*index.indices(len(self)))):
            del self[i]

    @abstractmethod
    def __getsingleitem__(self, index):
        # type: (Any) -> Any
        raise IndexError

    @abstractmethod
    def __setsingleitem__(self, index, value):
        # type: (Any, Any) -> None
        raise IndexError

    @abstractmethod
    def __delsingleitem__(self, index):
        # type: (Any) -> None
        raise IndexError