Source code for architxt.cli

import shutil
import subprocess
from pathlib import Path

import mlflow
import more_itertools
import typer
from mlflow.data.code_dataset_source import CodeDatasetSource
from mlflow.data.meta_dataset import MetaDataset
from platformdirs import user_cache_path
from rich.columns import Columns
from rich.panel import Panel
from rich.table import Table
from typer.main import get_command

from architxt.bucket.zodb import ZODBTreeBucket
from architxt.generator import gen_instance
from architxt.inspector import ForestInspector
from architxt.schema import Group, Relation, Schema
from architxt.simplification.tree_rewriting import rewrite

from .export import app as export_app
from .loader import app as loader_app
from .utils import console, load_forest, show_metrics, show_schema

app = typer.Typer(
    help="ArchiTXT is a tool for structuring textual data into a valid database model. "
    "It is guided by a meta-grammar and uses an iterative process of tree rewriting.",
    no_args_is_help=True,
)

app.add_typer(loader_app, name="load")
app.add_typer(export_app, name="export")


[docs] @app.callback() def mlflow_setup() -> None: mlflow.set_experiment('ArchiTXT')
[docs] @app.command( help="Launch the web-based UI.", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}, ) def ui(ctx: typer.Context) -> None: """Launch the web-based UI using Streamlit.""" try: from architxt import ui subprocess.run(['streamlit', 'run', ui.__file__, *ctx.args], check=True) except FileNotFoundError as error: console.print( "[red]Streamlit is not installed or not found. Please install it with `pip install architxt[ui]` to use the UI.[/]" ) raise typer.Exit(code=1) from error
[docs] @app.command(help="Simplify a bunch of databased together.") def simplify( files: list[Path] = typer.Argument(..., exists=True, readable=True, help="Path of the data files to load."), *, tau: float = typer.Option(0.7, help="The similarity threshold.", min=0, max=1), epoch: int = typer.Option(100, help="Number of iteration for tree rewriting.", min=1), min_support: int = typer.Option(20, help="Minimum support for tree patterns.", min=1), workers: int | None = typer.Option( None, help="Number of parallel worker processes to use. Defaults to the number of available CPU cores.", min=1 ), output: Path | None = typer.Option(None, help="Path to save the result."), debug: bool = typer.Option(False, help="Enable debug mode for more verbose output."), metrics: bool = typer.Option(False, help="Show metrics of the simplification."), log: bool = typer.Option(False, help="Enable logging to MLFlow."), ) -> None: if log: console.print(f'[green]MLFlow logging enabled. Logs will be send to {mlflow.get_tracking_uri()}[/]') mlflow.start_run(description='simplification') for file in files: mlflow.log_input(MetaDataset(CodeDatasetSource({}), name=file.name)) with ZODBTreeBucket(storage_path=output) as forest: forest.update(load_forest(files)) console.print( f'[blue]Rewriting {len(forest)} trees with tau={tau}, epoch={epoch}, min_support={min_support}[/]' ) result_metrics = rewrite( forest, tau=tau, epoch=epoch, min_support=min_support, debug=debug, max_workers=workers ) # Generate schema schema = Schema.from_forest(forest, keep_unlabelled=False) show_schema(schema) if metrics: show_metrics(result_metrics)
[docs] @app.command(help="Display statistics of a dataset.") def inspect( files: list[Path] = typer.Argument(..., exists=True, readable=True, help="Path of the data files to load."), ) -> None: """Display overall statistics.""" inspector = ForestInspector() forest = load_forest(files) forest = inspector(forest) # Display the schema schema = Schema.from_forest(forest, keep_unlabelled=False) show_schema(schema) # Display the largest tree console.print(Panel(str(inspector.largest_tree), title="Largest Tree")) # Entity Count tables = [] for chunk in more_itertools.chunked_even(inspector.entity_count.most_common(), 10): entity_table = Table() entity_table.add_column("Entity", style="cyan", no_wrap=True) entity_table.add_column("Count", style="magenta") for entity, count in chunk: entity_table.add_row(entity, str(count)) tables.append(entity_table) # Display statistics stats_table = Table() stats_table.add_column("Metric", style="cyan", no_wrap=True) stats_table.add_column("Value", style="magenta") stats_table.add_row("Total Trees", str(inspector.total_trees)) stats_table.add_row("Total Entities", str(inspector.total_entities)) stats_table.add_row("Average Tree Height", f"{inspector.avg_height:.3f}") stats_table.add_row("Maximum Tree Height", str(inspector.max_height)) stats_table.add_row("Average Tree size", f"{inspector.avg_size:.3f}") stats_table.add_row("Maximum Tree size", str(inspector.max_size)) stats_table.add_row("Average Branching", f"{inspector.avg_branching:.3f}") stats_table.add_row("Maximum Branching", str(inspector.max_children)) console.print(Columns([*tables, stats_table], equal=True))
[docs] @app.command(name='generate', help="Generate synthetic instance.") def instance_generator( *, sample: int = typer.Option(100, help="Number of sentences to sample from the corpus.", min=1), output: Path | None = typer.Option(None, help="Path to save the result."), ) -> None: """Generate synthetic database instances.""" schema = Schema.from_description( groups={ Group(name='SOSY', entities={'SOSY', 'ANATOMIE', 'SUBSTANCE'}), Group(name='TREATMENT', entities={'SUBSTANCE', 'DOSAGE', 'ADMINISTRATION', 'FREQUENCY'}), Group(name='EXAM', entities={'DIAGNOSTIC_PROCEDURE', 'ANATOMIE'}), }, relations={ Relation(name='PRESCRIPTION', left='SOSY', right='TREATMENT'), Relation(name='EXAM_RESULT', left='EXAM', right='SOSY'), }, ) show_schema(schema) with ZODBTreeBucket(storage_path=output) as forest: with console.status("[cyan]Generating synthetic instances..."): forest.update(gen_instance(schema, size=sample, generate_collections=False)) console.print(f'[green]Generated {len(forest)} synthetic instances.[/]')
[docs] @app.command(name='cache-clear', help='Clear all the cache of ArchiTXT') def clear_cache( *, force: bool = typer.Option(False, help="Force the deletion of the cache without asking."), ) -> None: cache_path = user_cache_path('architxt') if not cache_path.exists(): console.print("[yellow]Cache is already empty or does not exist. Doing nothing.[/]") return if not force and not typer.confirm('All the cache data will be deleted. Are you sure?'): typer.Abort() shutil.rmtree(cache_path) console.print("[green]Cache cleared.[/]")
# Click command used for Sphinx documentation _click_command = get_command(app)