Source code for architxt.nlp.contrib.spacy
from __future__ import annotations
from typing import TYPE_CHECKING
import spacy
from aiostream import pipe, stream
from architxt.nlp.entity_extractor import EntityExtractor
from architxt.nlp.model import AnnotatedSentence, Entity
if TYPE_CHECKING:
from collections.abc import AsyncIterable, AsyncIterator, Iterable
from spacy.tokens import Doc
__all__ = ['SpacyEntityExtractor']
SPACY_DISABLED_PIPELINES = {'parser', 'senter', 'sentencizer', 'textcat', 'lemmatizer', 'tagger'}
[docs]
class SpacyEntityExtractor(EntityExtractor):
def __init__(self, model_name: str = "en_core_web_sm") -> None:
self.nlp = spacy.load(model_name, disable=SPACY_DISABLED_PIPELINES)
@staticmethod
def _doc_to_annotated(doc: Doc) -> AnnotatedSentence:
entities = [
Entity(
name=ent.label_,
start=ent.start_char,
end=ent.end_char,
id=f"{ent.label_}_{ent.start_char}_{ent.end_char}",
value=ent.text,
)
for ent in doc.ents
]
return AnnotatedSentence(txt=doc.text, entities=entities, rels=[])
def __call__(self, sentence: str) -> AnnotatedSentence:
doc = self.nlp(sentence)
return self._doc_to_annotated(doc)
[docs]
async def batch(
self,
sentences: Iterable[str] | AsyncIterable[str],
*,
batch_size: int = 128,
) -> AsyncIterator[AnnotatedSentence]:
sentence_stream = (
stream.iterate(sentences)
| pipe.chunks(batch_size)
| pipe.flatmap(self.nlp.pipe)
| pipe.map(self._doc_to_annotated)
)
async with sentence_stream.stream() as streamer:
async for sentence in streamer:
yield sentence