# coding: utf-8
from __future__ import print_function, absolute_import, division
import datetime
import base64
import binascii
import re
import sys
import types
import warnings
# fmt: off
from ruamel.yaml.error import (MarkedYAMLError, MarkedYAMLFutureWarning,
MantissaNoDotYAML1_1Warning)
from ruamel.yaml.nodes import * # NOQA
from ruamel.yaml.nodes import (SequenceNode, MappingNode, ScalarNode)
from ruamel.yaml.compat import (utf8, builtins_module, to_str, PY2, PY3, # NOQA
ordereddict, text_type, nprint, nprintf, version_tnf,
Hashable, MutableSequence, MutableMapping)
from ruamel.yaml.comments import * # NOQA
from ruamel.yaml.comments import (CommentedMap, CommentedOrderedMap, CommentedSet,
CommentedKeySeq, CommentedSeq, TaggedScalar,
CommentedKeyMap)
from ruamel.yaml.scalarstring import (SingleQuotedScalarString, DoubleQuotedScalarString,
LiteralScalarString, FoldedScalarString,
ScalarString,)
from ruamel.yaml.scalarint import ScalarInt, BinaryInt, OctalInt, HexInt, HexCapsInt
from ruamel.yaml.scalarfloat import ScalarFloat
from ruamel.yaml.timestamp import TimeStamp
from ruamel.yaml.util import RegExp
if False: # MYPY
from typing import Any, Dict, List, Set, Generator, Union, Optional # NOQA
__all__ = ['BaseConstructor', 'SafeConstructor', 'Constructor',
'ConstructorError', 'RoundTripConstructor']
# fmt: on
class ConstructorError(MarkedYAMLError):
pass
class DuplicateKeyFutureWarning(MarkedYAMLFutureWarning):
pass
class DuplicateKeyError(MarkedYAMLFutureWarning):
pass
class BaseConstructor(object):
yaml_constructors = {} # type: Dict[Any, Any]
yaml_multi_constructors = {} # type: Dict[Any, Any]
def __init__(self, preserve_quotes=None, loader=None):
# type: (Optional[bool], Any) -> None
self.loader = loader
if self.loader is not None and getattr(self.loader, '_constructor', None) is None:
self.loader._constructor = self
self.loader = loader
self.constructed_objects = {} # type: Dict[Any, Any]
self.recursive_objects = {} # type: Dict[Any, Any]
self.state_generators = [] # type: List[Any]
self.deep_construct = False
self._preserve_quotes = preserve_quotes
self.allow_duplicate_keys = version_tnf((0, 15, 1), (0, 16))
@property
def composer(self):
# type: () -> Any
if hasattr(self.loader, 'typ'):
return self.loader.composer
try:
return self.loader._composer
except AttributeError:
sys.stdout.write('slt {}\n'.format(type(self)))
sys.stdout.write('slc {}\n'.format(self.loader._composer))
sys.stdout.write('{}\n'.format(dir(self)))
raise
@property
def resolver(self):
# type: () -> Any
if hasattr(self.loader, 'typ'):
return self.loader.resolver
return self.loader._resolver
def check_data(self):
# type: () -> Any
# If there are more documents available?
return self.composer.check_node()
def get_data(self):
# type: () -> Any
# Construct and return the next document.
if self.composer.check_node():
return self.construct_document(self.composer.get_node())
def get_single_data(self):
# type: () -> Any
# Ensure that the stream contains a single document and construct it.
node = self.composer.get_single_node()
if node is not None:
return self.construct_document(node)
return None
def construct_document(self, node):
# type: (Any) -> Any
data = self.construct_object(node)
while bool(self.state_generators):
state_generators = self.state_generators
self.state_generators = []
for generator in state_generators:
for _dummy in generator:
pass
self.constructed_objects = {}
self.recursive_objects = {}
self.deep_construct = False
return data
def construct_object(self, node, deep=False):
# type: (Any, bool) -> Any
"""deep is True when creating an object/mapping recursively,
in that case want the underlying elements available during construction
"""
if node in self.constructed_objects:
return self.constructed_objects[node]
if deep:
old_deep = self.deep_construct
self.deep_construct = True
if node in self.recursive_objects:
return self.recursive_objects[node]
# raise ConstructorError(
# None, None, 'found unconstructable recursive node', node.start_mark
# )
self.recursive_objects[node] = None
constructor = None # type: Any
tag_suffix = None
if node.tag in self.yaml_constructors:
constructor = self.yaml_constructors[node.tag]
else:
for tag_prefix in self.yaml_multi_constructors:
if node.tag.startswith(tag_prefix):
tag_suffix = node.tag[len(tag_prefix) :]
constructor = self.yaml_multi_constructors[tag_prefix]
break
else:
if None in self.yaml_multi_constructors:
tag_suffix = node.tag
constructor = self.yaml_multi_constructors[None]
elif None in self.yaml_constructors:
constructor = self.yaml_constructors[None]
elif isinstance(node, ScalarNode):
constructor = self.__class__.construct_scalar # type: ignore
elif isinstance(node, SequenceNode):
constructor = self.__class__.construct_sequence # type: ignore
elif isinstance(node, MappingNode):
constructor = self.__class__.construct_mapping # type: ignore
if tag_suffix is None:
data = constructor(self, node)
else:
data = constructor(self, tag_suffix, node)
if isinstance(data, types.GeneratorType):
generator = data
data = next(generator)
if self.deep_construct:
for _dummy in generator:
pass
else:
self.state_generators.append(generator)
self.constructed_objects[node] = data
del self.recursive_objects[node]
if deep:
self.deep_construct = old_deep
return data
def construct_scalar(self, node):
# type: (Any) -> Any
if not isinstance(node, ScalarNode):
raise ConstructorError(
None, None, 'expected a scalar node, but found %s' % node.id, node.start_mark
)
return node.value
def construct_sequence(self, node, deep=False):
# type: (Any, bool) -> Any
"""deep is True when creating an object/mapping recursively,
in that case want the underlying elements available during construction
"""
if not isinstance(node, SequenceNode):
raise ConstructorError(
None, None, 'expected a sequence node, but found %s' % node.id, node.start_mark
)
return [self.construct_object(child, deep=deep) for child in node.value]
def construct_mapping(self, node, deep=False):
# type: (Any, bool) -> Any
"""deep is True when creating an object/mapping recursively,
in that case want the underlying elements available during construction
"""
if not isinstance(node, MappingNode):
raise ConstructorError(
None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark
)
total_mapping = {}
if getattr(node, 'merge', None) is not None:
todo = [(node.merge, False), (node.value, False)]
else:
todo = [(node.value, True)]
for values, check in todo:
mapping = {} # type: Dict[Any, Any]
for key_node, value_node in values:
# keys can be list -> deep
key = self.construct_object(key_node, deep=True)
# lists are not hashable, but tuples are
if not isinstance(key, Hashable):
if isinstance(key, list):
key = tuple(key)
if PY2:
try:
hash(key)
except TypeError as exc:
raise ConstructorError(
'while constructing a mapping',
node.start_mark,
'found unacceptable key (%s)' % exc,
key_node.start_mark,
)
else:
if not isinstance(key, Hashable):
raise ConstructorError(
'while constructing a mapping',
node.start_mark,
'found unhashable key',
key_node.start_mark,
)
value = self.construct_object(value_node, deep=deep)
if check:
self.check_mapping_key(node, key_node, mapping, key, value)
mapping[key] = value
total_mapping.update(mapping)
return total_mapping
def check_mapping_key(self, node, key_node, mapping, key, value):
# type: (Any, Any, Any, Any, Any) -> None
if key in mapping:
if not self.allow_duplicate_keys:
mk = mapping.get(key)
if PY2:
if isinstance(key, unicode):
key = key.encode('utf-8')
if isinstance(value, unicode):
value = value.encode('utf-8')
if isinstance(mk, unicode):
mk = mk.encode('utf-8')
args = [
'while constructing a mapping',
node.start_mark,
'found duplicate key "{}" with value "{}" '
'(original value: "{}")'.format(key, value, mk),
key_node.start_mark,
"""
To suppress this check see:
http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
""",
"""\
Duplicate keys will become an error in future releases, and are errors
by default when using the new API.
""",
]
if self.allow_duplicate_keys is None:
warnings.warn(DuplicateKeyFutureWarning(*args))
else:
raise DuplicateKeyError(*args)
def check_set_key(self, node, key_node, setting, key):
# type: (Any, Any, Any, Any, Any) -> None
if key in setting:
if not self.allow_duplicate_keys:
if PY2:
if isinstance(key, unicode):
key = key.encode('utf-8')
args = [
'while constructing a set',
node.start_mark,
'found duplicate key "{}"'.format(key),
key_node.start_mark,
"""
To suppress this check see:
http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
""",
"""\
Duplicate keys will become an error in future releases, and are errors
by default when using the new API.
""",
]
if self.allow_duplicate_keys is None:
warnings.warn(DuplicateKeyFutureWarning(*args))
else:
raise DuplicateKeyError(*args)
def construct_pairs(self, node, deep=False):
# type: (Any, bool) -> Any
if not isinstance(node, MappingNode):
raise ConstructorError(
None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark
)
pairs = []
for key_node, value_node in node.value:
key = self.construct_object(key_node, deep=deep)
value = self.construct_object(value_node, deep=deep)
pairs.append((key, value))
return pairs
@classmethod
def add_constructor(cls, tag, constructor):
# type: (Any, Any) -> None
if 'yaml_constructors' not in cls.__dict__:
cls.yaml_constructors = cls.yaml_constructors.copy()
cls.yaml_constructors[tag] = constructor
@classmethod
def add_multi_constructor(cls, tag_prefix, multi_constructor):
# type: (Any, Any) -> None
if 'yaml_multi_constructors' not in cls.__dict__:
cls.yaml_multi_constructors = cls.yaml_multi_constructors.copy()
cls.yaml_multi_constructors[tag_prefix] = multi_constructor
class SafeConstructor(BaseConstructor):
def construct_scalar(self, node):
# type: (Any) -> Any
if isinstance(node, MappingNode):
for key_node, value_node in node.value:
if key_node.tag == u'tag:yaml.org,2002:value':
return self.construct_scalar(value_node)
return BaseConstructor.construct_scalar(self, node)
def flatten_mapping(self, node):
# type: (Any) -> Any
Loading ...