Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
spacy / tests / serialize / test_serialize_pipeline.py
Size: Mime:
import pickle

import pytest
import srsly
from thinc.api import Linear

import spacy
from spacy import Vocab, load, registry
from spacy.lang.en import English
from spacy.language import Language
from spacy.pipeline import DependencyParser, EntityRecognizer, EntityRuler
from spacy.pipeline import SentenceRecognizer, Tagger, TextCategorizer
from spacy.pipeline import TrainablePipe
from spacy.pipeline.dep_parser import DEFAULT_PARSER_MODEL
from spacy.pipeline.senter import DEFAULT_SENTER_MODEL
from spacy.pipeline.tagger import DEFAULT_TAGGER_MODEL
from spacy.pipeline.textcat import DEFAULT_SINGLE_TEXTCAT_MODEL
from spacy.util import ensure_path, load_model
from spacy.tokens import Span

from ..util import make_tempdir

test_parsers = [DependencyParser, EntityRecognizer]


@pytest.fixture
def parser(en_vocab):
    config = {
        "learn_tokens": False,
        "min_action_freq": 30,
        "update_with_oracle_cut_size": 100,
        "beam_width": 1,
        "beam_update_prob": 1.0,
        "beam_density": 0.0,
    }
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser = DependencyParser(en_vocab, model, **config)
    parser.add_label("nsubj")
    return parser


@pytest.fixture
def blank_parser(en_vocab):
    config = {
        "learn_tokens": False,
        "min_action_freq": 30,
        "update_with_oracle_cut_size": 100,
        "beam_width": 1,
        "beam_update_prob": 1.0,
        "beam_density": 0.0,
    }
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser = DependencyParser(en_vocab, model, **config)
    return parser


@pytest.fixture
def taggers(en_vocab):
    cfg = {"model": DEFAULT_TAGGER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    tagger1 = Tagger(en_vocab, model)
    tagger2 = Tagger(en_vocab, model)
    return tagger1, tagger2


@pytest.mark.issue(3456)
def test_issue3456():
    # this crashed because of a padding error in layer.ops.unflatten in thinc
    nlp = English()
    tagger = nlp.add_pipe("tagger")
    tagger.add_label("A")
    nlp.initialize()
    list(nlp.pipe(["hi", ""]))


@pytest.mark.issue(3526)
def test_issue_3526_1(en_vocab):
    patterns = [
        {"label": "HELLO", "pattern": "hello world"},
        {"label": "BYE", "pattern": [{"LOWER": "bye"}, {"LOWER": "bye"}]},
        {"label": "HELLO", "pattern": [{"ORTH": "HELLO"}]},
        {"label": "COMPLEX", "pattern": [{"ORTH": "foo", "OP": "*"}]},
        {"label": "TECH_ORG", "pattern": "Apple", "id": "a1"},
    ]
    nlp = Language(vocab=en_vocab)
    ruler = EntityRuler(nlp, patterns=patterns, overwrite_ents=True)
    ruler_bytes = ruler.to_bytes()
    assert len(ruler) == len(patterns)
    assert len(ruler.labels) == 4
    assert ruler.overwrite
    new_ruler = EntityRuler(nlp)
    new_ruler = new_ruler.from_bytes(ruler_bytes)
    assert len(new_ruler) == len(ruler)
    assert len(new_ruler.labels) == 4
    assert new_ruler.overwrite == ruler.overwrite
    assert new_ruler.ent_id_sep == ruler.ent_id_sep


@pytest.mark.issue(3526)
def test_issue_3526_2(en_vocab):
    patterns = [
        {"label": "HELLO", "pattern": "hello world"},
        {"label": "BYE", "pattern": [{"LOWER": "bye"}, {"LOWER": "bye"}]},
        {"label": "HELLO", "pattern": [{"ORTH": "HELLO"}]},
        {"label": "COMPLEX", "pattern": [{"ORTH": "foo", "OP": "*"}]},
        {"label": "TECH_ORG", "pattern": "Apple", "id": "a1"},
    ]
    nlp = Language(vocab=en_vocab)
    ruler = EntityRuler(nlp, patterns=patterns, overwrite_ents=True)
    bytes_old_style = srsly.msgpack_dumps(ruler.patterns)
    new_ruler = EntityRuler(nlp)
    new_ruler = new_ruler.from_bytes(bytes_old_style)
    assert len(new_ruler) == len(ruler)
    for pattern in ruler.patterns:
        assert pattern in new_ruler.patterns
    assert new_ruler.overwrite is not ruler.overwrite


@pytest.mark.issue(3526)
def test_issue_3526_3(en_vocab):
    patterns = [
        {"label": "HELLO", "pattern": "hello world"},
        {"label": "BYE", "pattern": [{"LOWER": "bye"}, {"LOWER": "bye"}]},
        {"label": "HELLO", "pattern": [{"ORTH": "HELLO"}]},
        {"label": "COMPLEX", "pattern": [{"ORTH": "foo", "OP": "*"}]},
        {"label": "TECH_ORG", "pattern": "Apple", "id": "a1"},
    ]
    nlp = Language(vocab=en_vocab)
    ruler = EntityRuler(nlp, patterns=patterns, overwrite_ents=True)
    with make_tempdir() as tmpdir:
        out_file = tmpdir / "entity_ruler"
        srsly.write_jsonl(out_file.with_suffix(".jsonl"), ruler.patterns)
        new_ruler = EntityRuler(nlp).from_disk(out_file)
        for pattern in ruler.patterns:
            assert pattern in new_ruler.patterns
        assert len(new_ruler) == len(ruler)
        assert new_ruler.overwrite is not ruler.overwrite


@pytest.mark.issue(3526)
def test_issue_3526_4(en_vocab):
    nlp = Language(vocab=en_vocab)
    patterns = [{"label": "ORG", "pattern": "Apple"}]
    config = {"overwrite_ents": True}
    ruler = nlp.add_pipe("entity_ruler", config=config)
    ruler.add_patterns(patterns)
    with make_tempdir() as tmpdir:
        nlp.to_disk(tmpdir)
        ruler = nlp.get_pipe("entity_ruler")
        assert ruler.patterns == [{"label": "ORG", "pattern": "Apple"}]
        assert ruler.overwrite is True
        nlp2 = load(tmpdir)
        new_ruler = nlp2.get_pipe("entity_ruler")
        assert new_ruler.patterns == [{"label": "ORG", "pattern": "Apple"}]
        assert new_ruler.overwrite is True


@pytest.mark.issue(4042)
def test_issue4042():
    """Test that serialization of an EntityRuler before NER works fine."""
    nlp = English()
    # add ner pipe
    ner = nlp.add_pipe("ner")
    ner.add_label("SOME_LABEL")
    nlp.initialize()
    # Add entity ruler
    patterns = [
        {"label": "MY_ORG", "pattern": "Apple"},
        {"label": "MY_GPE", "pattern": [{"lower": "san"}, {"lower": "francisco"}]},
    ]
    # works fine with "after"
    ruler = nlp.add_pipe("entity_ruler", before="ner")
    ruler.add_patterns(patterns)
    doc1 = nlp("What do you think about Apple ?")
    assert doc1.ents[0].label_ == "MY_ORG"

    with make_tempdir() as d:
        output_dir = ensure_path(d)
        if not output_dir.exists():
            output_dir.mkdir()
        nlp.to_disk(output_dir)
        nlp2 = load_model(output_dir)
        doc2 = nlp2("What do you think about Apple ?")
        assert doc2.ents[0].label_ == "MY_ORG"


@pytest.mark.issue(4042)
def test_issue4042_bug2():
    """
    Test that serialization of an NER works fine when new labels were added.
    This is the second bug of two bugs underlying the issue 4042.
    """
    nlp1 = English()
    # add ner pipe
    ner1 = nlp1.add_pipe("ner")
    ner1.add_label("SOME_LABEL")
    nlp1.initialize()
    # add a new label to the doc
    doc1 = nlp1("What do you think about Apple ?")
    assert len(ner1.labels) == 1
    assert "SOME_LABEL" in ner1.labels
    apple_ent = Span(doc1, 5, 6, label="MY_ORG")
    doc1.ents = list(doc1.ents) + [apple_ent]
    # Add the label explicitly. Previously we didn't require this.
    ner1.add_label("MY_ORG")
    ner1(doc1)
    assert len(ner1.labels) == 2
    assert "SOME_LABEL" in ner1.labels
    assert "MY_ORG" in ner1.labels
    with make_tempdir() as d:
        # assert IO goes fine
        output_dir = ensure_path(d)
        if not output_dir.exists():
            output_dir.mkdir()
        ner1.to_disk(output_dir)
        config = {}
        ner2 = nlp1.create_pipe("ner", config=config)
        ner2.from_disk(output_dir)
        assert len(ner2.labels) == 2


@pytest.mark.issue(4725)
def test_issue4725_1():
    """Ensure the pickling of the NER goes well"""
    vocab = Vocab(vectors_name="test_vocab_add_vector")
    nlp = English(vocab=vocab)
    config = {
        "update_with_oracle_cut_size": 111,
    }
    ner = nlp.create_pipe("ner", config=config)
    with make_tempdir() as tmp_path:
        with (tmp_path / "ner.pkl").open("wb") as file_:
            pickle.dump(ner, file_)
            assert ner.cfg["update_with_oracle_cut_size"] == 111

        with (tmp_path / "ner.pkl").open("rb") as file_:
            ner2 = pickle.load(file_)
            assert ner2.cfg["update_with_oracle_cut_size"] == 111


@pytest.mark.parametrize("Parser", test_parsers)
def test_serialize_parser_roundtrip_bytes(en_vocab, Parser):
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser = Parser(en_vocab, model)
    new_parser = Parser(en_vocab, model)
    new_parser = new_parser.from_bytes(parser.to_bytes(exclude=["vocab"]))
    bytes_2 = new_parser.to_bytes(exclude=["vocab"])
    bytes_3 = parser.to_bytes(exclude=["vocab"])
    assert len(bytes_2) == len(bytes_3)
    assert bytes_2 == bytes_3


@pytest.mark.parametrize("Parser", test_parsers)
def test_serialize_parser_strings(Parser):
    vocab1 = Vocab()
    label = "FunnyLabel"
    assert label not in vocab1.strings
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser1 = Parser(vocab1, model)
    parser1.add_label(label)
    assert label in parser1.vocab.strings
    vocab2 = Vocab()
    assert label not in vocab2.strings
    parser2 = Parser(vocab2, model)
    parser2 = parser2.from_bytes(parser1.to_bytes(exclude=["vocab"]))
    assert label in parser2.vocab.strings


@pytest.mark.parametrize("Parser", test_parsers)
def test_serialize_parser_roundtrip_disk(en_vocab, Parser):
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    parser = Parser(en_vocab, model)
    with make_tempdir() as d:
        file_path = d / "parser"
        parser.to_disk(file_path)
        parser_d = Parser(en_vocab, model)
        parser_d = parser_d.from_disk(file_path)
        parser_bytes = parser.to_bytes(exclude=["model", "vocab"])
        parser_d_bytes = parser_d.to_bytes(exclude=["model", "vocab"])
        assert len(parser_bytes) == len(parser_d_bytes)
        assert parser_bytes == parser_d_bytes


def test_to_from_bytes(parser, blank_parser):
    assert parser.model is not True
    assert blank_parser.model is not True
    assert blank_parser.moves.n_moves != parser.moves.n_moves
    bytes_data = parser.to_bytes(exclude=["vocab"])
    # the blank parser needs to be resized before we can call from_bytes
    blank_parser.model.attrs["resize_output"](blank_parser.model, parser.moves.n_moves)
    blank_parser.from_bytes(bytes_data)
    assert blank_parser.model is not True
    assert blank_parser.moves.n_moves == parser.moves.n_moves


def test_serialize_tagger_roundtrip_bytes(en_vocab, taggers):
    tagger1 = taggers[0]
    tagger1_b = tagger1.to_bytes()
    tagger1 = tagger1.from_bytes(tagger1_b)
    assert tagger1.to_bytes() == tagger1_b
    cfg = {"model": DEFAULT_TAGGER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    new_tagger1 = Tagger(en_vocab, model).from_bytes(tagger1_b)
    new_tagger1_b = new_tagger1.to_bytes()
    assert len(new_tagger1_b) == len(tagger1_b)
    assert new_tagger1_b == tagger1_b


def test_serialize_tagger_roundtrip_disk(en_vocab, taggers):
    tagger1, tagger2 = taggers
    with make_tempdir() as d:
        file_path1 = d / "tagger1"
        file_path2 = d / "tagger2"
        tagger1.to_disk(file_path1)
        tagger2.to_disk(file_path2)
        cfg = {"model": DEFAULT_TAGGER_MODEL}
        model = registry.resolve(cfg, validate=True)["model"]
        tagger1_d = Tagger(en_vocab, model).from_disk(file_path1)
        tagger2_d = Tagger(en_vocab, model).from_disk(file_path2)
        assert tagger1_d.to_bytes() == tagger2_d.to_bytes()


def test_serialize_tagger_strings(en_vocab, de_vocab, taggers):
    label = "SomeWeirdLabel"
    assert label not in en_vocab.strings
    assert label not in de_vocab.strings
    tagger = taggers[0]
    assert label not in tagger.vocab.strings
    with make_tempdir() as d:
        # check that custom labels are serialized as part of the component's strings.jsonl
        tagger.add_label(label)
        assert label in tagger.vocab.strings
        file_path = d / "tagger1"
        tagger.to_disk(file_path)
        # ensure that the custom strings are loaded back in when using the tagger in another pipeline
        cfg = {"model": DEFAULT_TAGGER_MODEL}
        model = registry.resolve(cfg, validate=True)["model"]
        tagger2 = Tagger(de_vocab, model).from_disk(file_path)
        assert label in tagger2.vocab.strings


@pytest.mark.issue(1105)
def test_serialize_textcat_empty(en_vocab):
    # See issue #1105
    cfg = {"model": DEFAULT_SINGLE_TEXTCAT_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    textcat = TextCategorizer(en_vocab, model, threshold=0.5)
    textcat.to_bytes(exclude=["vocab"])


@pytest.mark.parametrize("Parser", test_parsers)
def test_serialize_pipe_exclude(en_vocab, Parser):
    cfg = {"model": DEFAULT_PARSER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]

    def get_new_parser():
        new_parser = Parser(en_vocab, model)
        return new_parser

    parser = Parser(en_vocab, model)
    parser.cfg["foo"] = "bar"
    new_parser = get_new_parser().from_bytes(parser.to_bytes(exclude=["vocab"]))
    assert "foo" in new_parser.cfg
    new_parser = get_new_parser().from_bytes(
        parser.to_bytes(exclude=["vocab"]), exclude=["cfg"]
    )
    assert "foo" not in new_parser.cfg
    new_parser = get_new_parser().from_bytes(
        parser.to_bytes(exclude=["cfg"]), exclude=["vocab"]
    )
    assert "foo" not in new_parser.cfg


def test_serialize_sentencerecognizer(en_vocab):
    cfg = {"model": DEFAULT_SENTER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    sr = SentenceRecognizer(en_vocab, model)
    sr_b = sr.to_bytes()
    sr_d = SentenceRecognizer(en_vocab, model).from_bytes(sr_b)
    assert sr.to_bytes() == sr_d.to_bytes()


def test_serialize_pipeline_disable_enable():
    nlp = English()
    nlp.add_pipe("ner")
    nlp.add_pipe("tagger")
    nlp.disable_pipe("tagger")
    assert nlp.config["nlp"]["disabled"] == ["tagger"]
    config = nlp.config.copy()
    nlp2 = English.from_config(config)
    assert nlp2.pipe_names == ["ner"]
    assert nlp2.component_names == ["ner", "tagger"]
    assert nlp2.disabled == ["tagger"]
    assert nlp2.config["nlp"]["disabled"] == ["tagger"]
    with make_tempdir() as d:
        nlp2.to_disk(d)
        nlp3 = spacy.load(d)
    assert nlp3.pipe_names == ["ner"]
    assert nlp3.component_names == ["ner", "tagger"]
    with make_tempdir() as d:
        nlp3.to_disk(d)
        nlp4 = spacy.load(d, disable=["ner"])
    assert nlp4.pipe_names == []
    assert nlp4.component_names == ["ner", "tagger"]
    assert nlp4.disabled == ["ner", "tagger"]
    with make_tempdir() as d:
        nlp.to_disk(d)
        nlp5 = spacy.load(d, exclude=["tagger"])
    assert nlp5.pipe_names == ["ner"]
    assert nlp5.component_names == ["ner"]
    assert nlp5.disabled == []


def test_serialize_custom_trainable_pipe():
    class BadCustomPipe1(TrainablePipe):
        def __init__(self, vocab):
            pass

    class BadCustomPipe2(TrainablePipe):
        def __init__(self, vocab):
            self.vocab = vocab
            self.model = None

    class CustomPipe(TrainablePipe):
        def __init__(self, vocab, model):
            self.vocab = vocab
            self.model = model

    pipe = BadCustomPipe1(Vocab())
    with pytest.raises(ValueError):
        pipe.to_bytes()
    with make_tempdir() as d:
        with pytest.raises(ValueError):
            pipe.to_disk(d)
    pipe = BadCustomPipe2(Vocab())
    with pytest.raises(ValueError):
        pipe.to_bytes()
    with make_tempdir() as d:
        with pytest.raises(ValueError):
            pipe.to_disk(d)
    pipe = CustomPipe(Vocab(), Linear())
    pipe_bytes = pipe.to_bytes()
    new_pipe = CustomPipe(Vocab(), Linear()).from_bytes(pipe_bytes)
    assert new_pipe.to_bytes() == pipe_bytes
    with make_tempdir() as d:
        pipe.to_disk(d)
        new_pipe = CustomPipe(Vocab(), Linear()).from_disk(d)
    assert new_pipe.to_bytes() == pipe_bytes


def test_load_without_strings():
    nlp = spacy.blank("en")
    orig_strings_length = len(nlp.vocab.strings)
    word = "unlikely_word_" * 20
    nlp.vocab.strings.add(word)
    assert len(nlp.vocab.strings) == orig_strings_length + 1
    with make_tempdir() as d:
        nlp.to_disk(d)
        # reload with strings
        reloaded_nlp = load(d)
        assert len(nlp.vocab.strings) == len(reloaded_nlp.vocab.strings)
        assert word in reloaded_nlp.vocab.strings
        # reload without strings
        reloaded_nlp = load(d, exclude=["strings"])
        assert orig_strings_length == len(reloaded_nlp.vocab.strings)
        assert word not in reloaded_nlp.vocab.strings