# coding: utf-8
from __future__ import print_function, absolute_import, division, unicode_literals
# Scanner produces tokens of the following types:
# STREAM-START
# STREAM-END
# DIRECTIVE(name, value)
# DOCUMENT-START
# DOCUMENT-END
# BLOCK-SEQUENCE-START
# BLOCK-MAPPING-START
# BLOCK-END
# FLOW-SEQUENCE-START
# FLOW-MAPPING-START
# FLOW-SEQUENCE-END
# FLOW-MAPPING-END
# BLOCK-ENTRY
# FLOW-ENTRY
# KEY
# VALUE
# ALIAS(value)
# ANCHOR(value)
# TAG(value)
# SCALAR(value, plain, style)
#
# RoundTripScanner
# COMMENT(value)
#
# Read comments in the Scanner code for more details.
#
from ruamel.yaml.error import MarkedYAMLError
from ruamel.yaml.tokens import * # NOQA
from ruamel.yaml.compat import utf8, unichr, PY3, check_anchorname_char, nprint # NOQA
if False: # MYPY
from typing import Any, Dict, Optional, List, Union, Text # NOQA
from ruamel.yaml.compat import VersionType # NOQA
__all__ = ['Scanner', 'RoundTripScanner', 'ScannerError']
_THE_END = '\n\0\r\x85\u2028\u2029'
_THE_END_SPACE_TAB = ' \n\0\t\r\x85\u2028\u2029'
_SPACE_TAB = ' \t'
class ScannerError(MarkedYAMLError):
pass
class SimpleKey(object):
# See below simple keys treatment.
def __init__(self, token_number, required, index, line, column, mark):
# type: (Any, Any, int, int, int, Any) -> None
self.token_number = token_number
self.required = required
self.index = index
self.line = line
self.column = column
self.mark = mark
class Scanner(object):
def __init__(self, loader=None):
# type: (Any) -> None
"""Initialize the scanner."""
# It is assumed that Scanner and Reader will have a common descendant.
# Reader do the dirty work of checking for BOM and converting the
# input data to Unicode. It also adds NUL to the end.
#
# Reader supports the following methods
# self.peek(i=0) # peek the next i-th character
# self.prefix(l=1) # peek the next l characters
# self.forward(l=1) # read the next l characters and move the pointer
self.loader = loader
if self.loader is not None and getattr(self.loader, '_scanner', None) is None:
self.loader._scanner = self
self.reset_scanner()
self.first_time = False
@property
def flow_level(self):
# type: () -> int
return len(self.flow_context)
def reset_scanner(self):
# type: () -> None
# Had we reached the end of the stream?
self.done = False
# flow_context is an expanding/shrinking list consisting of '{' and '['
# for each unclosed flow context. If empty list that means block context
self.flow_context = [] # type: List[Text]
# List of processed tokens that are not yet emitted.
self.tokens = [] # type: List[Any]
# Add the STREAM-START token.
self.fetch_stream_start()
# Number of tokens that were emitted through the `get_token` method.
self.tokens_taken = 0
# The current indentation level.
self.indent = -1
# Past indentation levels.
self.indents = [] # type: List[int]
# Variables related to simple keys treatment.
# A simple key is a key that is not denoted by the '?' indicator.
# Example of simple keys:
# ---
# block simple key: value
# ? not a simple key:
# : { flow simple key: value }
# We emit the KEY token before all keys, so when we find a potential
# simple key, we try to locate the corresponding ':' indicator.
# Simple keys should be limited to a single line and 1024 characters.
# Can a simple key start at the current position? A simple key may
# start:
# - at the beginning of the line, not counting indentation spaces
# (in block context),
# - after '{', '[', ',' (in the flow context),
# - after '?', ':', '-' (in the block context).
# In the block context, this flag also signifies if a block collection
# may start at the current position.
self.allow_simple_key = True
# Keep track of possible simple keys. This is a dictionary. The key
# is `flow_level`; there can be no more that one possible simple key
# for each level. The value is a SimpleKey record:
# (token_number, required, index, line, column, mark)
# A simple key may start with ALIAS, ANCHOR, TAG, SCALAR(flow),
# '[', or '{' tokens.
self.possible_simple_keys = {} # type: Dict[Any, Any]
@property
def reader(self):
# type: () -> Any
try:
return self._scanner_reader # type: ignore
except AttributeError:
if hasattr(self.loader, 'typ'):
self._scanner_reader = self.loader.reader
else:
self._scanner_reader = self.loader._reader
return self._scanner_reader
@property
def scanner_processing_version(self): # prefix until un-composited
# type: () -> VersionType
if hasattr(self.loader, 'typ'):
return self.loader.resolver.processing_version # type: ignore
return self.loader.processing_version # type: ignore
# Public methods.
def check_token(self, *choices):
# type: (Any) -> bool
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
if bool(self.tokens):
if not choices:
return True
for choice in choices:
if isinstance(self.tokens[0], choice):
return True
return False
def peek_token(self):
# type: () -> Any
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
if bool(self.tokens):
return self.tokens[0]
def get_token(self):
# type: () -> Any
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
if bool(self.tokens):
self.tokens_taken += 1
return self.tokens.pop(0)
# Private methods.
def need_more_tokens(self):
# type: () -> bool
if self.done:
return False
if not self.tokens:
return True
# The current token may be a potential simple key, so we
# need to look further.
self.stale_possible_simple_keys()
if self.next_possible_simple_key() == self.tokens_taken:
return True
return False
def fetch_comment(self, comment):
# type: (Any) -> None
raise NotImplementedError
def fetch_more_tokens(self):
# type: () -> Any
# Eat whitespaces and comments until we reach the next token.
comment = self.scan_to_next_token()
if comment is not None: # never happens for base scanner
return self.fetch_comment(comment)
# Remove obsolete possible simple keys.
self.stale_possible_simple_keys()
# Compare the current indentation and column. It may add some tokens
# and decrease the current indentation level.
self.unwind_indent(self.reader.column)
# Peek the next character.
ch = self.reader.peek()
# Is it the end of stream?
if ch == '\0':
return self.fetch_stream_end()
# Is it a directive?
if ch == '%' and self.check_directive():
return self.fetch_directive()
# Is it the document start?
if ch == '-' and self.check_document_start():
return self.fetch_document_start()
# Is it the document end?
if ch == '.' and self.check_document_end():
return self.fetch_document_end()
# TODO: support for BOM within a stream.
# if ch == u'\uFEFF':
# return self.fetch_bom() <-- issue BOMToken
# Note: the order of the following checks is NOT significant.
# Is it the flow sequence start indicator?
if ch == '[':
return self.fetch_flow_sequence_start()
# Is it the flow mapping start indicator?
if ch == '{':
return self.fetch_flow_mapping_start()
# Is it the flow sequence end indicator?
if ch == ']':
return self.fetch_flow_sequence_end()
# Is it the flow mapping end indicator?
if ch == '}':
return self.fetch_flow_mapping_end()
# Is it the flow entry indicator?
if ch == ',':
return self.fetch_flow_entry()
# Is it the block entry indicator?
if ch == '-' and self.check_block_entry():
return self.fetch_block_entry()
# Is it the key indicator?
if ch == '?' and self.check_key():
return self.fetch_key()
# Is it the value indicator?
if ch == ':' and self.check_value():
return self.fetch_value()
# Is it an alias?
if ch == '*':
return self.fetch_alias()
# Is it an anchor?
if ch == '&':
return self.fetch_anchor()
# Is it a tag?
if ch == '!':
return self.fetch_tag()
# Is it a literal scalar?
if ch == '|' and not self.flow_level:
return self.fetch_literal()
# Is it a folded scalar?
if ch == '>' and not self.flow_level:
return self.fetch_folded()
# Is it a single quoted scalar?
if ch == "'":
return self.fetch_single()
# Is it a double quoted scalar?
if ch == '"':
return self.fetch_double()
# It must be a plain scalar then.
if self.check_plain():
return self.fetch_plain()
# No? It's an error. Let's produce a nice error message.
raise ScannerError(
'while scanning for the next token',
None,
'found character %r that cannot start any token' % utf8(ch),
self.reader.get_mark(),
)
# Simple keys treatment.
def next_possible_simple_key(self):
# type: () -> Any
# Return the number of the nearest possible simple key. Actually we
# don't need to loop through the whole dictionary. We may replace it
# with the following code:
# if not self.possible_simple_keys:
# return None
# return self.possible_simple_keys[
# min(self.possible_simple_keys.keys())].token_number
min_token_number = None
for level in self.possible_simple_keys:
key = self.possible_simple_keys[level]
if min_token_number is None or key.token_number < min_token_number:
min_token_number = key.token_number
return min_token_number
def stale_possible_simple_keys(self):
# type: () -> None
# Remove entries that are no longer possible simple keys. According to
# the YAML specification, simple keys
Loading ...