import asyncio
from pathlib import Path
import click
import mlflow
import typer
from neo4j import GraphDatabase
from sqlalchemy import create_engine
from architxt.bucket.zodb import ZODBTreeBucket
from architxt.database import loader
from architxt.nlp import raw_load_corpus
from architxt.nlp.parser.corenlp import CoreNLPParser
from architxt.schema import Schema
from .utils import console, show_schema
__all__ = ['app']
ENTITIES_FILTER = {'TIME', 'MOMENT', 'DUREE', 'DURATION', 'DATE', 'OTHER_ENTITY', 'OTHER_EVENT', 'COREFERENCE'}
RELATIONS_FILTER = {'TEMPORALITE', 'CAUSE-CONSEQUENCE'}
ENTITIES_MAPPING = {
'FREQ': 'FREQUENCY',
'FREQUENCE': 'FREQUENCY',
'SIGN_SYMPTOM': 'SOSY',
'VALEUR': 'VALUE',
'HEIGHT': 'VALUE',
'WEIGHT': 'VALUE',
'MASS': 'VALUE',
'QUANTITATIVE_CONCEPT': 'VALUE',
'QUALITATIVE_CONCEPT': 'VALUE',
'DISTANCE': 'VALUE',
'VOLUME': 'VALUE',
'AREA': 'VALUE',
'LAB_VALUE': 'VALUE',
'TRAITEMENT': 'THERAPEUTIC_PROCEDURE',
'MEDICATION': 'THERAPEUTIC_PROCEDURE',
'DOSE': 'DOSAGE',
'OUTCOME': 'SOSY',
'EXAMEN': 'DIAGNOSTIC_PROCEDURE',
'PATHOLOGIE': 'DISEASE_DISORDER',
'MODE': 'ADMINISTRATION',
}
app = typer.Typer(no_args_is_help=True)
[docs]
@app.command(name='document', help="Extract document database into a formatted tree.")
def load_document(
file: Path = typer.Argument(..., exists=True, readable=True, help="The document file to read."),
*,
raw: bool = typer.Option(
False, help="Enable row reading, skipping any transformation to convert it to the metamodel."
),
root_name: str = typer.Option('ROOT', help="The root node name."),
sample: int | None = typer.Option(None, help="Number of element to sample from the document.", min=1),
output: Path | None = typer.Option(None, help="Path to save the result."),
merge_existing: bool = typer.Option(False, help="Should we merge data if output file already exist"),
) -> None:
"""Read a parse a document file to a structured tree."""
if (
output is not None
and output.exists()
and not merge_existing
and not typer.confirm("The storage path already exists. Merge existing data?")
):
console.print("[red]Cannot store data due to conflict.[/]")
raise typer.Abort()
with ZODBTreeBucket(storage_path=output) as bucket:
bucket.update(loader.read_document(file, raw_read=raw, root_name=root_name, sample=sample or 0))
schema = Schema.from_forest(bucket, keep_unlabelled=False)
show_schema(schema)
[docs]
@app.command(name='sql', help="Extract a SQL compatible database into a formatted tree.")
def load_sql(
uri: str = typer.Argument(..., help="Database connection string."),
*,
simplify_association: bool = typer.Option(True, help="Simplify association tables."),
sample: int | None = typer.Option(None, help="Number of sentences to sample from the corpus.", min=1),
output: Path | None = typer.Option(None, help="Path to save the result."),
merge_existing: bool = typer.Option(False, help="Should we merge data if output file already exist"),
) -> None:
"""Extract the database schema and relations to a tree format."""
if (
output is not None
and output.exists()
and not merge_existing
and not typer.confirm("The storage path already exists. Merge existing data?")
):
console.print("[red]Cannot store data due to conflict.[/]")
raise typer.Abort()
with (
create_engine(uri).connect() as connection,
ZODBTreeBucket(storage_path=output) as forest,
):
forest.update(loader.read_sql(connection, simplify_association=simplify_association, sample=sample or 0))
schema = Schema.from_forest(forest, keep_unlabelled=False)
show_schema(schema)
[docs]
@app.command(name='graph', help="Extract a cypher/bolt compatible database into a formatted tree.")
def load_graph(
uri: str = typer.Argument(..., help="Database connection string."),
*,
username: str | None = typer.Option('neo4j', help="Username to use for authentication."),
password: str | None = typer.Option(None, help="Password to use for authentication."),
sample: int | None = typer.Option(None, help="Number of sentences to sample from the corpus.", min=1),
output: Path | None = typer.Option(None, help="Path to save the result."),
merge_existing: bool = typer.Option(False, help="Should we merge data if output file already exist"),
) -> None:
if (
output is not None
and output.exists()
and not merge_existing
and not typer.confirm("The storage path already exists. Merge existing data?")
):
console.print("[red]Cannot store data due to conflict.[/]")
raise typer.Abort()
auth = (username, password) if username and password else None
with (
GraphDatabase.driver(uri, auth=auth) as driver,
driver.session() as session,
ZODBTreeBucket(storage_path=output) as forest,
):
forest.update(loader.read_cypher(session, sample=sample or 0))
schema = Schema.from_forest(forest, keep_unlabelled=False)
show_schema(schema)
[docs]
@app.command(name='corpus', help="Extract a database schema form a corpus.", no_args_is_help=True)
def load_corpus(
corpus_path: list[typer.FileBinaryRead] = typer.Argument(
..., exists=True, readable=True, help="Path to the input corpus."
),
*,
language: list[str] = typer.Option(['French'], help="Language of the input corpus."),
corenlp_url: str = typer.Option('http://localhost:9000', help="URL of the CoreNLP server."),
sample: int | None = typer.Option(None, help="Number of sentences to sample from the corpus.", min=1),
resolver: str | None = typer.Option(
None,
help="The entity resolver to use when loading the corpus.",
click_type=click.Choice(['umls', 'mesh', 'rxnorm', 'go', 'hpo'], case_sensitive=False),
),
output: Path | None = typer.Option(None, help="Path to save the result."),
merge_existing: bool = typer.Option(False, help="Should we merge data if output file already exist"),
cache: bool = typer.Option(True, help="Enable caching of the analyzed corpus to prevent re-parsing."),
log: bool = typer.Option(False, help="Enable logging to MLFlow."),
) -> None:
"""Load a corpus and print the database schema as a CFG."""
if (
output is not None
and output.exists()
and not merge_existing
and not typer.confirm("The storage path already exists. Merge existing data?")
):
console.print("[red]Cannot store data due to conflict.[/]")
raise typer.Abort()
if log:
console.print(f'[green]MLFlow logging enabled. Logs will be send to {mlflow.get_tracking_uri()}[/]')
mlflow.start_run(description='corpus_processing')
with ZODBTreeBucket(storage_path=output) as bucket:
asyncio.run(
bucket.async_update(
raw_load_corpus(
corpus_path,
language,
parser=CoreNLPParser(corenlp_url=corenlp_url),
resolver_name=resolver,
cache=cache,
entities_filter=ENTITIES_FILTER,
relations_filter=RELATIONS_FILTER,
entities_mapping=ENTITIES_MAPPING,
sample=sample,
)
)
)
schema = Schema.from_forest(bucket, keep_unlabelled=False)
show_schema(schema)