from __future__ import absolute_import, division, unicode_literals
from pip._vendor.six import unichr as chr
from collections import deque
from .constants import spaceCharacters
from .constants import entities
from .constants import asciiLetters, asciiUpper2Lower
from .constants import digits, hexDigits, EOF
from .constants import tokenTypes, tagTokenTypes
from .constants import replacementCharacters
from ._inputstream import HTMLInputStream
from ._trie import Trie
entitiesTrie = Trie(entities)
class HTMLTokenizer(object):
""" This class takes care of tokenizing HTML.
* self.currentToken
Holds the token that is currently being processed.
* self.state
Holds a reference to the method to be invoked... XXX
* self.stream
Points to HTMLInputStream object.
"""
def __init__(self, stream, parser=None, **kwargs):
self.stream = HTMLInputStream(stream, **kwargs)
self.parser = parser
# Setup the initial tokenizer state
self.escapeFlag = False
self.lastFourChars = []
self.state = self.dataState
self.escape = False
# The current token being created
self.currentToken = None
super(HTMLTokenizer, self).__init__()
def __iter__(self):
""" This is where the magic happens.
We do our usually processing through the states and when we have a token
to return we yield the token which pauses processing until the next token
is requested.
"""
self.tokenQueue = deque([])
# Start processing. When EOF is reached self.state will return False
# instead of True and the loop will terminate.
while self.state():
while self.stream.errors:
yield {"type": tokenTypes["ParseError"], "data": self.stream.errors.pop(0)}
while self.tokenQueue:
yield self.tokenQueue.popleft()
def consumeNumberEntity(self, isHex):
"""This function returns either U+FFFD or the character based on the
decimal or hexadecimal representation. It also discards ";" if present.
If not present self.tokenQueue.append({"type": tokenTypes["ParseError"]}) is invoked.
"""
allowed = digits
radix = 10
if isHex:
allowed = hexDigits
radix = 16
charStack = []
# Consume all the characters that are in range while making sure we
# don't hit an EOF.
c = self.stream.char()
while c in allowed and c is not EOF:
charStack.append(c)
c = self.stream.char()
# Convert the set of characters consumed to an int.
charAsInt = int("".join(charStack), radix)
# Certain characters get replaced with others
if charAsInt in replacementCharacters:
char = replacementCharacters[charAsInt]
self.tokenQueue.append({"type": tokenTypes["ParseError"], "data":
"illegal-codepoint-for-numeric-entity",
"datavars": {"charAsInt": charAsInt}})
elif ((0xD800 <= charAsInt <= 0xDFFF) or
(charAsInt > 0x10FFFF)):
char = "\uFFFD"
self.tokenQueue.append({"type": tokenTypes["ParseError"], "data":
"illegal-codepoint-for-numeric-entity",
"datavars": {"charAsInt": charAsInt}})
else:
# Should speed up this check somehow (e.g. move the set to a constant)
if ((0x0001 <= charAsInt <= 0x0008) or
(0x000E <= charAsInt <= 0x001F) or
(0x007F <= charAsInt <= 0x009F) or
(0xFDD0 <= charAsInt <= 0xFDEF) or
charAsInt in frozenset([0x000B, 0xFFFE, 0xFFFF, 0x1FFFE,
0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE,
0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE,
0x5FFFF, 0x6FFFE, 0x6FFFF, 0x7FFFE,
0x7FFFF, 0x8FFFE, 0x8FFFF, 0x9FFFE,
0x9FFFF, 0xAFFFE, 0xAFFFF, 0xBFFFE,
0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE,
0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE,
0xFFFFF, 0x10FFFE, 0x10FFFF])):
self.tokenQueue.append({"type": tokenTypes["ParseError"],
"data":
"illegal-codepoint-for-numeric-entity",
"datavars": {"charAsInt": charAsInt}})
try:
# Try/except needed as UCS-2 Python builds' unichar only works
# within the BMP.
char = chr(charAsInt)
except ValueError:
v = charAsInt - 0x10000
char = chr(0xD800 | (v >> 10)) + chr(0xDC00 | (v & 0x3FF))
# Discard the ; if present. Otherwise, put it back on the queue and
# invoke parseError on parser.
if c != ";":
self.tokenQueue.append({"type": tokenTypes["ParseError"], "data":
"numeric-entity-without-semicolon"})
self.stream.unget(c)
return char
def consumeEntity(self, allowedChar=None, fromAttribute=False):
# Initialise to the default output for when no entity is matched
output = "&"
charStack = [self.stream.char()]
if (charStack[0] in spaceCharacters or charStack[0] in (EOF, "<", "&") or
(allowedChar is not None and allowedChar == charStack[0])):
self.stream.unget(charStack[0])
elif charStack[0] == "#":
# Read the next character to see if it's hex or decimal
hex = False
charStack.append(self.stream.char())
if charStack[-1] in ("x", "X"):
hex = True
charStack.append(self.stream.char())
# charStack[-1] should be the first digit
if (hex and charStack[-1] in hexDigits) \
or (not hex and charStack[-1] in digits):
# At least one digit found, so consume the whole number
self.stream.unget(charStack[-1])
output = self.consumeNumberEntity(hex)
else:
# No digits found
self.tokenQueue.append({"type": tokenTypes["ParseError"],
"data": "expected-numeric-entity"})
self.stream.unget(charStack.pop())
output = "&" + "".join(charStack)
else:
# At this point in the process might have named entity. Entities
# are stored in the global variable "entities".
#
# Consume characters and compare to these to a substring of the
# entity names in the list until the substring no longer matches.
while (charStack[-1] is not EOF):
if not entitiesTrie.has_keys_with_prefix("".join(charStack)):
break
charStack.append(self.stream.char())
# At this point we have a string that starts with some characters
# that may match an entity
# Try to find the longest entity the string will match to take care
# of ¬i for instance.
try:
entityName = entitiesTrie.longest_prefix("".join(charStack[:-1]))
entityLength = len(entityName)
except KeyError:
entityName = None
if entityName is not None:
if entityName[-1] != ";":
self.tokenQueue.append({"type": tokenTypes["ParseError"], "data":
"named-entity-without-semicolon"})
if (entityName[-1] != ";" and fromAttribute and
(charStack[entityLength] in asciiLetters or
charStack[entityLength] in digits or
charStack[entityLength] == "=")):
self.stream.unget(charStack.pop())
output = "&" + "".join(charStack)
else:
output = entities[entityName]
self.stream.unget(charStack.pop())
output += "".join(charStack[entityLength:])
else:
self.tokenQueue.append({"type": tokenTypes["ParseError"], "data":
"expected-named-entity"})
self.stream.unget(charStack.pop())
output = "&" + "".join(charStack)
if fromAttribute:
self.currentToken["data"][-1][1] += output
else:
if output in spaceCharacters:
tokenType = "SpaceCharacters"
else:
tokenType = "Characters"
self.tokenQueue.append({"type": tokenTypes[tokenType], "data": output})
def processEntityInAttribute(self, allowedChar):
"""This method replaces the need for "entityInAttributeValueState".
"""
self.consumeEntity(allowedChar=allowedChar, fromAttribute=True)
def emitCurrentToken(self):
"""This method is a generic handler for emitting the tags. It also sets
the state to "data" because that's what's needed after a token has been
emitted.
"""
token = self.currentToken
# Add token to the queue to be yielded
if (token["type"] in tagTokenTypes):
token["name"] = token["name"].translate(asciiUpper2Lower)
if token["type"] == tokenTypes["EndTag"]:
if token["data"]:
self.tokenQueue.append({"type": tokenTypes["ParseError"],
"data": "attributes-in-end-tag"})
if token["selfClosing"]:
self.tokenQueue.append({"type": tokenTypes["ParseError"],
"data": "self-closing-flag-on-end-tag"})
self.tokenQueue.append(token)
self.state = self.dataState
# Below are the various tokenizer states worked out.
def dataState(self):
data = self.stream.char()
if data == "&":
self.state = self.entityDataState
elif data == "<":
self.state = self.tagOpenState
elif data == "\u0000":
self.tokenQueue.append({"type": tokenTypes["ParseError"],
"data": "invalid-codepoint"})
self.tokenQueue.append({"type": tokenTypes["Characters"],
"data": "\u0000"})
elif data is EOF:
# Tokenization ends.
return False
elif data in spaceCharacters:
# Directly after emitting a token you switch back to the "data
# state". At that point spaceCharacters are important so they are
# emitted separately.
self.tokenQueue.append({"type": tokenTypes["SpaceCharacters"], "data":
data + self.stream.charsUntil(spaceCharacters, True)})
# No need to update lastFourChars here, since the first space will
# have already been appended to lastFourChars and will have broken
# any <!-- or --> sequences
else:
chars = self.stream.charsUntil(("&", "<", "\u0000"))
self.tokenQueue.append({"type": tokenTypes["Characters"], "data":
data + chars})
return True
def entityDataState(self):
self.consumeEntity()
self.state = self.dataState
return True
def rcdataState(self):
data = self.stream.char()
if data == "&":
self.state = self.characterReferenceInRcdata
elif data == "<":
self.state = self.rcdataLessThanSignState
elif data == EOF:
# Tokenization ends.
return False
elif data == "\u0000":
self.tokenQueue.append({"type": tokenTypes["ParseError"],
"data": "invalid-codepoint"})
self.tokenQueue.append({"type": tokenTypes["Characters"],
"data": "\uFFFD"})
elif data in spaceCharacters:
# Directly after emitting a token you switch back to the "data
# state". At that point spaceCharacters are important so they are
# emitted separately.
self.tokenQueue.append({"type": tokenTypes["SpaceCharacters"], "data":
data + self.stream.charsUntil(spaceCharacters, True)})
# No need to update lastFourChars here, since the first space will
# have already been appended to lastFourChars and will have broken
# any <!-- or --> sequences
else:
chars = self.stream.charsUntil(("&", "<", "\u0000"))
self.tokenQueue.append({"type": tokenTypes["Characters"], "data":
data + chars})
return True
def characterReferenceInRcdata(self):
self.consumeEntity()
self.state = self.rcdataState
return True
def rawtextState(self):
data = self.stream.char()
if data == "<":
self.state = self.rawtextLessThanSignState
elif data == "\u0000":
self.tokenQueue.append({"type": tokenTypes["ParseError"],
"data": "invalid-codepoint"})
self.tokenQueue.append({"type": tokenTypes["Characters"],
"data": "\uFFFD"})
elif data == EOF:
# Tokenization ends.
return False
else:
chars = self.stream.charsUntil(("<", "\u0000"))
self.tokenQueue.append({"type": tokenTypes["Characters"], "data":
data + chars})
return True
def scriptDataState(self):
data = self.stream.char()
if data == "<":
self.state = self.scriptDataLessThanSignState
elif data == "\u0000":
self.tokenQueue.append({"type": tokenTypes["ParseError"],
"data": "invalid-codepoint"})
self.tokenQueue.append({"type": tokenTypes["Characters"],
"data": "\uFFFD"})
elif data == EOF:
# Tokenization ends.
return False
else:
chars = self.stream.charsUntil(("<", "\u0000"))
self.tokenQueue.append({"type": tokenTypes["Characters"], "data":
data + chars})
return True
Loading ...