Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / lxml   python

Repository URL to install this package:

/ html / diff.py

import difflib
from lxml import etree
from lxml.html import fragment_fromstring
import re

__all__ = ['html_annotate', 'htmldiff']

try:
    from html import escape as html_escape
except ImportError:
    from cgi import escape as html_escape
try:
    _unicode = unicode
except NameError:
    # Python 3
    _unicode = str
try:
    basestring
except NameError:
    # Python 3
    basestring = str

############################################################
## Annotation
############################################################

def default_markup(text, version):
    return '<span title="%s">%s</span>' % (
        html_escape(_unicode(version), 1), text)

def html_annotate(doclist, markup=default_markup):
    """
    doclist should be ordered from oldest to newest, like::

        >>> version1 = 'Hello World'
        >>> version2 = 'Goodbye World'
        >>> print(html_annotate([(version1, 'version 1'),
        ...                      (version2, 'version 2')]))
        <span title="version 2">Goodbye</span> <span title="version 1">World</span>

    The documents must be *fragments* (str/UTF8 or unicode), not
    complete documents

    The markup argument is a function to markup the spans of words.
    This function is called like markup('Hello', 'version 2'), and
    returns HTML.  The first argument is text and never includes any
    markup.  The default uses a span with a title:

        >>> print(default_markup('Some Text', 'by Joe'))
        <span title="by Joe">Some Text</span>
    """
    # The basic strategy we have is to split the documents up into
    # logical tokens (which are words with attached markup).  We then
    # do diffs of each of the versions to track when a token first
    # appeared in the document; the annotation attached to the token
    # is the version where it first appeared.
    tokenlist = [tokenize_annotated(doc, version)
                 for doc, version in doclist]
    cur_tokens = tokenlist[0]
    for tokens in tokenlist[1:]:
        html_annotate_merge_annotations(cur_tokens, tokens)
        cur_tokens = tokens

    # After we've tracked all the tokens, we can combine spans of text
    # that are adjacent and have the same annotation
    cur_tokens = compress_tokens(cur_tokens)
    # And finally add markup
    result = markup_serialize_tokens(cur_tokens, markup)
    return ''.join(result).strip()

def tokenize_annotated(doc, annotation): 
    """Tokenize a document and add an annotation attribute to each token
    """
    tokens = tokenize(doc, include_hrefs=False)
    for tok in tokens: 
        tok.annotation = annotation
    return tokens

def html_annotate_merge_annotations(tokens_old, tokens_new): 
    """Merge the annotations from tokens_old into tokens_new, when the
    tokens in the new document already existed in the old document.
    """
    s = InsensitiveSequenceMatcher(a=tokens_old, b=tokens_new)
    commands = s.get_opcodes()

    for command, i1, i2, j1, j2 in commands:
        if command == 'equal': 
            eq_old = tokens_old[i1:i2]
            eq_new = tokens_new[j1:j2]
            copy_annotations(eq_old, eq_new)

def copy_annotations(src, dest): 
    """
    Copy annotations from the tokens listed in src to the tokens in dest
    """
    assert len(src) == len(dest)
    for src_tok, dest_tok in zip(src, dest): 
        dest_tok.annotation = src_tok.annotation

def compress_tokens(tokens):
    """
    Combine adjacent tokens when there is no HTML between the tokens, 
    and they share an annotation
    """
    result = [tokens[0]] 
    for tok in tokens[1:]: 
        if (not result[-1].post_tags and 
            not tok.pre_tags and 
            result[-1].annotation == tok.annotation): 
            compress_merge_back(result, tok)
        else: 
            result.append(tok)
    return result

def compress_merge_back(tokens, tok): 
    """ Merge tok into the last element of tokens (modifying the list of
    tokens in-place).  """
    last = tokens[-1]
    if type(last) is not token or type(tok) is not token: 
        tokens.append(tok)
    else:
        text = _unicode(last)
        if last.trailing_whitespace:
            text += last.trailing_whitespace
        text += tok
        merged = token(text,
                       pre_tags=last.pre_tags,
                       post_tags=tok.post_tags,
                       trailing_whitespace=tok.trailing_whitespace)
        merged.annotation = last.annotation
        tokens[-1] = merged
    
def markup_serialize_tokens(tokens, markup_func):
    """
    Serialize the list of tokens into a list of text chunks, calling
    markup_func around text to add annotations.
    """
    for token in tokens:
        for pre in token.pre_tags:
            yield pre
        html = token.html()
        html = markup_func(html, token.annotation)
        if token.trailing_whitespace:
            html += token.trailing_whitespace
        yield html
        for post in token.post_tags:
            yield post


############################################################
## HTML Diffs
############################################################

def htmldiff(old_html, new_html):
    ## FIXME: this should take parsed documents too, and use their body
    ## or other content.
    """ Do a diff of the old and new document.  The documents are HTML
    *fragments* (str/UTF8 or unicode), they are not complete documents
    (i.e., no <html> tag).

    Returns HTML with <ins> and <del> tags added around the
    appropriate text.  

    Markup is generally ignored, with the markup from new_html
    preserved, and possibly some markup from old_html (though it is
    considered acceptable to lose some of the old markup).  Only the
    words in the HTML are diffed.  The exception is <img> tags, which
    are treated like words, and the href attribute of <a> tags, which
    are noted inside the tag itself when there are changes.
    """ 
    old_html_tokens = tokenize(old_html)
    new_html_tokens = tokenize(new_html)
    result = htmldiff_tokens(old_html_tokens, new_html_tokens)
    result = ''.join(result).strip()
    return fixup_ins_del_tags(result)

def htmldiff_tokens(html1_tokens, html2_tokens):
    """ Does a diff on the tokens themselves, returning a list of text
    chunks (not tokens).
    """
    # There are several passes as we do the differences.  The tokens
    # isolate the portion of the content we care to diff; difflib does
    # all the actual hard work at that point.  
    #
    # Then we must create a valid document from pieces of both the old
    # document and the new document.  We generally prefer to take
    # markup from the new document, and only do a best effort attempt
    # to keep markup from the old document; anything that we can't
    # resolve we throw away.  Also we try to put the deletes as close
    # to the location where we think they would have been -- because
    # we are only keeping the markup from the new document, it can be
    # fuzzy where in the new document the old text would have gone.
    # Again we just do a best effort attempt.
    s = InsensitiveSequenceMatcher(a=html1_tokens, b=html2_tokens)
    commands = s.get_opcodes()
    result = []
    for command, i1, i2, j1, j2 in commands:
        if command == 'equal':
            result.extend(expand_tokens(html2_tokens[j1:j2], equal=True))
            continue
        if command == 'insert' or command == 'replace':
            ins_tokens = expand_tokens(html2_tokens[j1:j2])
            merge_insert(ins_tokens, result)
        if command == 'delete' or command == 'replace':
            del_tokens = expand_tokens(html1_tokens[i1:i2])
            merge_delete(del_tokens, result)
    # If deletes were inserted directly as <del> then we'd have an
    # invalid document at this point.  Instead we put in special
    # markers, and when the complete diffed document has been created
    # we try to move the deletes around and resolve any problems.
    result = cleanup_delete(result)

    return result

def expand_tokens(tokens, equal=False):
    """Given a list of tokens, return a generator of the chunks of
    text for the data in the tokens.
    """
    for token in tokens:
        for pre in token.pre_tags:
            yield pre
        if not equal or not token.hide_when_equal:
            if token.trailing_whitespace:
                yield token.html() + token.trailing_whitespace
            else:
                yield token.html()
        for post in token.post_tags:
            yield post

def merge_insert(ins_chunks, doc):
    """ doc is the already-handled document (as a list of text chunks);
    here we add <ins>ins_chunks</ins> to the end of that.  """
    # Though we don't throw away unbalanced_start or unbalanced_end
    # (we assume there is accompanying markup later or earlier in the
    # document), we only put <ins> around the balanced portion.
    unbalanced_start, balanced, unbalanced_end = split_unbalanced(ins_chunks)
    doc.extend(unbalanced_start)
    if doc and not doc[-1].endswith(' '):
        # Fix up the case where the word before the insert didn't end with 
        # a space
        doc[-1] += ' '
    doc.append('<ins>')
    if balanced and balanced[-1].endswith(' '):
        # We move space outside of </ins>
        balanced[-1] = balanced[-1][:-1]
    doc.extend(balanced)
    doc.append('</ins> ')
    doc.extend(unbalanced_end)

# These are sentinals to represent the start and end of a <del>
# segment, until we do the cleanup phase to turn them into proper
# markup:
class DEL_START:
    pass
class DEL_END:
    pass

class NoDeletes(Exception):
    """ Raised when the document no longer contains any pending deletes
    (DEL_START/DEL_END) """

def merge_delete(del_chunks, doc):
    """ Adds the text chunks in del_chunks to the document doc (another
    list of text chunks) with marker to show it is a delete.
    cleanup_delete later resolves these markers into <del> tags."""
    doc.append(DEL_START)
    doc.extend(del_chunks)
    doc.append(DEL_END)

def cleanup_delete(chunks):
    """ Cleans up any DEL_START/DEL_END markers in the document, replacing
    them with <del></del>.  To do this while keeping the document
    valid, it may need to drop some tags (either start or end tags).

    It may also move the del into adjacent tags to try to move it to a
    similar location where it was originally located (e.g., moving a
    delete into preceding <div> tag, if the del looks like (DEL_START,
    'Text</div>', DEL_END)"""
    while 1:
        # Find a pending DEL_START/DEL_END, splitting the document
        # into stuff-preceding-DEL_START, stuff-inside, and
        # stuff-following-DEL_END
        try:
            pre_delete, delete, post_delete = split_delete(chunks)
        except NoDeletes:
            # Nothing found, we've cleaned up the entire doc
            break
        # The stuff-inside-DEL_START/END may not be well balanced
        # markup.  First we figure out what unbalanced portions there are:
        unbalanced_start, balanced, unbalanced_end = split_unbalanced(delete)
        # Then we move the span forward and/or backward based on these
        # unbalanced portions:
        locate_unbalanced_start(unbalanced_start, pre_delete, post_delete)
        locate_unbalanced_end(unbalanced_end, pre_delete, post_delete)
        doc = pre_delete
        if doc and not doc[-1].endswith(' '):
            # Fix up case where the word before us didn't have a trailing space
            doc[-1] += ' '
        doc.append('<del>')
        if balanced and balanced[-1].endswith(' '):
            # We move space outside of </del>
            balanced[-1] = balanced[-1][:-1]
        doc.extend(balanced)
        doc.append('</del> ')
        doc.extend(post_delete)
        chunks = doc
    return chunks

def split_unbalanced(chunks):
    """Return (unbalanced_start, balanced, unbalanced_end), where each is
    a list of text and tag chunks.

    unbalanced_start is a list of all the tags that are opened, but
    not closed in this span.  Similarly, unbalanced_end is a list of
    tags that are closed but were not opened.  Extracting these might
    mean some reordering of the chunks."""
    start = []
    end = []
    tag_stack = []
    balanced = []
    for chunk in chunks:
        if not chunk.startswith('<'):
            balanced.append(chunk)
            continue
        endtag = chunk[1] == '/'
        name = chunk.split()[0].strip('<>/')
        if name in empty_tags:
            balanced.append(chunk)
            continue
        if endtag:
            if tag_stack and tag_stack[-1][0] == name:
                balanced.append(chunk)
                name, pos, tag = tag_stack.pop()
                balanced[pos] = tag
            elif tag_stack:
                start.extend([tag for name, pos, tag in tag_stack])
                tag_stack = []
                end.append(chunk)
            else:
                end.append(chunk)
        else:
            tag_stack.append((name, len(balanced), chunk))
            balanced.append(None)
    start.extend(
        [chunk for name, pos, chunk in tag_stack])
Loading ...