Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / ruamel.yaml   python

Repository URL to install this package:

/ yaml / scanner.py

# coding: utf-8

from __future__ import print_function, absolute_import, division, unicode_literals

# Scanner produces tokens of the following types:
# STREAM-START
# STREAM-END
# DIRECTIVE(name, value)
# DOCUMENT-START
# DOCUMENT-END
# BLOCK-SEQUENCE-START
# BLOCK-MAPPING-START
# BLOCK-END
# FLOW-SEQUENCE-START
# FLOW-MAPPING-START
# FLOW-SEQUENCE-END
# FLOW-MAPPING-END
# BLOCK-ENTRY
# FLOW-ENTRY
# KEY
# VALUE
# ALIAS(value)
# ANCHOR(value)
# TAG(value)
# SCALAR(value, plain, style)
#
# RoundTripScanner
# COMMENT(value)
#
# Read comments in the Scanner code for more details.
#

from ruamel.yaml.error import MarkedYAMLError
from ruamel.yaml.tokens import *  # NOQA
from ruamel.yaml.compat import utf8, unichr, PY3, check_anchorname_char, nprint  # NOQA

if False:  # MYPY
    from typing import Any, Dict, Optional, List, Union, Text  # NOQA
    from ruamel.yaml.compat import VersionType  # NOQA

__all__ = ['Scanner', 'RoundTripScanner', 'ScannerError']


_THE_END = '\n\0\r\x85\u2028\u2029'
_THE_END_SPACE_TAB = ' \n\0\t\r\x85\u2028\u2029'
_SPACE_TAB = ' \t'


class ScannerError(MarkedYAMLError):
    pass


class SimpleKey(object):
    # See below simple keys treatment.

    def __init__(self, token_number, required, index, line, column, mark):
        # type: (Any, Any, int, int, int, Any) -> None
        self.token_number = token_number
        self.required = required
        self.index = index
        self.line = line
        self.column = column
        self.mark = mark


class Scanner(object):
    def __init__(self, loader=None):
        # type: (Any) -> None
        """Initialize the scanner."""
        # It is assumed that Scanner and Reader will have a common descendant.
        # Reader do the dirty work of checking for BOM and converting the
        # input data to Unicode. It also adds NUL to the end.
        #
        # Reader supports the following methods
        #   self.peek(i=0)    # peek the next i-th character
        #   self.prefix(l=1)  # peek the next l characters
        #   self.forward(l=1) # read the next l characters and move the pointer

        self.loader = loader
        if self.loader is not None and getattr(self.loader, '_scanner', None) is None:
            self.loader._scanner = self
        self.reset_scanner()
        self.first_time = False

    @property
    def flow_level(self):
        # type: () -> int
        return len(self.flow_context)

    def reset_scanner(self):
        # type: () -> None
        # Had we reached the end of the stream?
        self.done = False

        # flow_context is an expanding/shrinking list consisting of '{' and '['
        # for each unclosed flow context. If empty list that means block context
        self.flow_context = []  # type: List[Text]

        # List of processed tokens that are not yet emitted.
        self.tokens = []  # type: List[Any]

        # Add the STREAM-START token.
        self.fetch_stream_start()

        # Number of tokens that were emitted through the `get_token` method.
        self.tokens_taken = 0

        # The current indentation level.
        self.indent = -1

        # Past indentation levels.
        self.indents = []  # type: List[int]

        # Variables related to simple keys treatment.

        # A simple key is a key that is not denoted by the '?' indicator.
        # Example of simple keys:
        #   ---
        #   block simple key: value
        #   ? not a simple key:
        #   : { flow simple key: value }
        # We emit the KEY token before all keys, so when we find a potential
        # simple key, we try to locate the corresponding ':' indicator.
        # Simple keys should be limited to a single line and 1024 characters.

        # Can a simple key start at the current position? A simple key may
        # start:
        # - at the beginning of the line, not counting indentation spaces
        #       (in block context),
        # - after '{', '[', ',' (in the flow context),
        # - after '?', ':', '-' (in the block context).
        # In the block context, this flag also signifies if a block collection
        # may start at the current position.
        self.allow_simple_key = True

        # Keep track of possible simple keys. This is a dictionary. The key
        # is `flow_level`; there can be no more that one possible simple key
        # for each level. The value is a SimpleKey record:
        #   (token_number, required, index, line, column, mark)
        # A simple key may start with ALIAS, ANCHOR, TAG, SCALAR(flow),
        # '[', or '{' tokens.
        self.possible_simple_keys = {}  # type: Dict[Any, Any]

    @property
    def reader(self):
        # type: () -> Any
        try:
            return self._scanner_reader  # type: ignore
        except AttributeError:
            if hasattr(self.loader, 'typ'):
                self._scanner_reader = self.loader.reader
            else:
                self._scanner_reader = self.loader._reader
            return self._scanner_reader

    @property
    def scanner_processing_version(self):  # prefix until un-composited
        # type: () -> VersionType
        if hasattr(self.loader, 'typ'):
            return self.loader.resolver.processing_version  # type: ignore
        return self.loader.processing_version  # type: ignore

    # Public methods.

    def check_token(self, *choices):
        # type: (Any) -> bool
        # Check if the next token is one of the given types.
        while self.need_more_tokens():
            self.fetch_more_tokens()
        if bool(self.tokens):
            if not choices:
                return True
            for choice in choices:
                if isinstance(self.tokens[0], choice):
                    return True
        return False

    def peek_token(self):
        # type: () -> Any
        # Return the next token, but do not delete if from the queue.
        while self.need_more_tokens():
            self.fetch_more_tokens()
        if bool(self.tokens):
            return self.tokens[0]

    def get_token(self):
        # type: () -> Any
        # Return the next token.
        while self.need_more_tokens():
            self.fetch_more_tokens()
        if bool(self.tokens):
            self.tokens_taken += 1
            return self.tokens.pop(0)

    # Private methods.

    def need_more_tokens(self):
        # type: () -> bool
        if self.done:
            return False
        if not self.tokens:
            return True
        # The current token may be a potential simple key, so we
        # need to look further.
        self.stale_possible_simple_keys()
        if self.next_possible_simple_key() == self.tokens_taken:
            return True
        return False

    def fetch_comment(self, comment):
        # type: (Any) -> None
        raise NotImplementedError

    def fetch_more_tokens(self):
        # type: () -> Any
        # Eat whitespaces and comments until we reach the next token.
        comment = self.scan_to_next_token()
        if comment is not None:  # never happens for base scanner
            return self.fetch_comment(comment)
        # Remove obsolete possible simple keys.
        self.stale_possible_simple_keys()

        # Compare the current indentation and column. It may add some tokens
        # and decrease the current indentation level.
        self.unwind_indent(self.reader.column)

        # Peek the next character.
        ch = self.reader.peek()

        # Is it the end of stream?
        if ch == '\0':
            return self.fetch_stream_end()

        # Is it a directive?
        if ch == '%' and self.check_directive():
            return self.fetch_directive()

        # Is it the document start?
        if ch == '-' and self.check_document_start():
            return self.fetch_document_start()

        # Is it the document end?
        if ch == '.' and self.check_document_end():
            return self.fetch_document_end()

        # TODO: support for BOM within a stream.
        # if ch == u'\uFEFF':
        #     return self.fetch_bom()    <-- issue BOMToken

        # Note: the order of the following checks is NOT significant.

        # Is it the flow sequence start indicator?
        if ch == '[':
            return self.fetch_flow_sequence_start()

        # Is it the flow mapping start indicator?
        if ch == '{':
            return self.fetch_flow_mapping_start()

        # Is it the flow sequence end indicator?
        if ch == ']':
            return self.fetch_flow_sequence_end()

        # Is it the flow mapping end indicator?
        if ch == '}':
            return self.fetch_flow_mapping_end()

        # Is it the flow entry indicator?
        if ch == ',':
            return self.fetch_flow_entry()

        # Is it the block entry indicator?
        if ch == '-' and self.check_block_entry():
            return self.fetch_block_entry()

        # Is it the key indicator?
        if ch == '?' and self.check_key():
            return self.fetch_key()

        # Is it the value indicator?
        if ch == ':' and self.check_value():
            return self.fetch_value()

        # Is it an alias?
        if ch == '*':
            return self.fetch_alias()

        # Is it an anchor?
        if ch == '&':
            return self.fetch_anchor()

        # Is it a tag?
        if ch == '!':
            return self.fetch_tag()

        # Is it a literal scalar?
        if ch == '|' and not self.flow_level:
            return self.fetch_literal()

        # Is it a folded scalar?
        if ch == '>' and not self.flow_level:
            return self.fetch_folded()

        # Is it a single quoted scalar?
        if ch == "'":
            return self.fetch_single()

        # Is it a double quoted scalar?
        if ch == '"':
            return self.fetch_double()

        # It must be a plain scalar then.
        if self.check_plain():
            return self.fetch_plain()

        # No? It's an error. Let's produce a nice error message.
        raise ScannerError(
            'while scanning for the next token',
            None,
            'found character %r that cannot start any token' % utf8(ch),
            self.reader.get_mark(),
        )

    # Simple keys treatment.

    def next_possible_simple_key(self):
        # type: () -> Any
        # Return the number of the nearest possible simple key. Actually we
        # don't need to loop through the whole dictionary. We may replace it
        # with the following code:
        #   if not self.possible_simple_keys:
        #       return None
        #   return self.possible_simple_keys[
        #           min(self.possible_simple_keys.keys())].token_number
        min_token_number = None
        for level in self.possible_simple_keys:
            key = self.possible_simple_keys[level]
            if min_token_number is None or key.token_number < min_token_number:
                min_token_number = key.token_number
        return min_token_number

    def stale_possible_simple_keys(self):
        # type: () -> None
        # Remove entries that are no longer possible simple keys. According to
        # the YAML specification, simple keys
Loading ...