Source code for architxt.database.loader.documents

import json
from collections.abc import Callable, Generator, Iterable, Sequence
from io import BytesIO
from pathlib import Path
from typing import Any, BinaryIO

import pandas as pd
import toml
import xmltodict
from ruamel import yaml

from architxt.tree import NodeLabel, NodeType, Tree, has_type

__all__ = ['read_document']

FILE_PARSERS: Sequence[Callable[[BytesIO | BinaryIO], dict[str, Any] | list[Any]]] = (
    json.load,  # JSON
    lambda x: toml.loads(x.read().decode()),  # TOML
    lambda x: list(yaml.YAML().load_all(x)),  # YAML
    lambda x: xmltodict.parse(x.read()),  # XML
    lambda x: pd.read_csv(x).to_dict(orient='records'),  # CSV
    lambda x: {k: df.to_dict(orient='records') for k, df in pd.read_excel(x, sheet_name=None).items()},  # EXCEL
)


[docs] def read_document( file: str | Path | BytesIO | BinaryIO, *, raw_read: bool = False, root_name: str = 'ROOT', ) -> Generator[Tree, None, None]: """ Read the file as a data tree. XML are parsed according to https://www.xml.com/pub/a/2006/05/31/converting-between-xml-and-json.html :param file: The document file to read. :param raw_read: If enabled, the tree corresponds to the document without any transformation applied. :param root_name: The root node name. :return: A list of trees representing the database. """ raw_data = read_document_file(file) document_tree = read_tree(raw_data, root_name=root_name) if raw_read: yield document_tree return yield from parse_document_tree(document_tree)
[docs] def read_document_file(file: str | Path | BytesIO | BinaryIO) -> dict[str, Any] | list[Any]: """ Read and parse a document file like XML, JSON, or CSV. :param file: The document database file to read. :return: The parsed contents of the file. :raises FileNotFoundError: If the file does not exist. :raises OSError: If the file cannot be read. :raises ValueError: If the file cannot be read or is empty. """ should_close = False document_db: BytesIO | BinaryIO if isinstance(file, str | Path): document_db = Path(file).open('rb') # noqa: SIM115 should_close = True else: document_db = file try: data = parse_file(document_db) finally: if should_close: document_db.close() if not data: msg = 'Empty document' raise ValueError(msg) return data
[docs] def parse_file(file: BytesIO | BinaryIO) -> dict[str, Any] | list[Any]: """ Parse a document database file like XML, JSON, or CSV. :param file: A file-like object opened for reading. :return: The parsed content of the file as a Python nested object. :raises: ValueError if none of the available parsers are able to process the input file. """ cursor = file.tell() for parser in FILE_PARSERS: try: return parser(file) except Exception: # noqa: PERF203 file.seek(cursor) continue msg = 'Unsupported file type' raise ValueError(msg)
[docs] def read_tree(data: dict[str, Any] | list[Any], *, root_name: str = 'ROOT') -> Tree: """ Recursively converts a document nested structure into a tree. - Dictionaries are treated as groups. - Lists are treated as collections. - Leaf elements are treated as entities. If a list contains only a single collection, the function flattens the output by returning that collection directly instead of nesting it under another collection node. :param data: The input data structure to be converted into a Tree. :param root_name: The label for the current node. :return: A nested tree structure corresponding to the input data. """ root_name = root_name.replace(' ', '_').lower() sub_elements: Iterable[tuple[str, Any]] = ( data.items() if isinstance(data, dict) else ((root_name, item) for item in data) ) children = [] for name, sub_element in sub_elements: if isinstance(sub_element, dict | list): # Recursively process nested structures children.append(read_tree(sub_element, root_name=name)) else: # Leaf node becomes an entity ent_label = NodeLabel(NodeType.ENT, str(name).replace(' ', '_').lower()) children.append(Tree(ent_label, [str(sub_element)])) # Flatten if the result is a single collection node if len(children) == 1 and has_type(children[0], NodeType.COLL): return children[0] label = NodeLabel(NodeType.COLL, root_name) if isinstance(data, list) else root_name return Tree(label, children)
[docs] def parse_document_tree(tree: Tree) -> Generator[Tree, None, None]: """ Parse a document tree and yields processed subtrees based on collection grouping. - If the root node is **not** a collection, the entire tree is processed and a single result is yielded. - If the root node **is** a collection, each child subtree is individually processed and yielded. TODO: Enhance tree decomposition for nested collections. If no collection exists at the root level, consider splitting at the closest collection and duplicating the path to the root for each collection element. :param tree: The nested tree to be parsed. :yield: Trees representing the database. """ trees = tree if has_type(tree, NodeType.COLL) else [tree] for tree in trees: parsed_tree = traverse_tree(tree)[1] if len(parsed_tree): yield parsed_tree
[docs] def traverse_tree(tree: Tree) -> tuple[Tree, Tree]: """ Recursively traverses and transforms a nested tree into a valid metamodel structure. The function extracts entity nodes and groups them under a single group node. It then establishes relations between this group and any nested subgroups. :param tree: The tree to traverse and transform. :returns: A tuple containing: - The group to anchor too for parent relationship. - The transformed tree converting subgroup to relations. """ if has_type(tree, NodeType.ENT): # Encapsulate entities into a group group_label = NodeLabel(NodeType.GROUP, tree.label().name) group_node = Tree(group_label, [Tree.convert(tree)]) return group_node, group_node if has_type(tree, NodeType.COLL): updated_children = [traverse_tree(child)[0] for child in tree] updated_tree = Tree(tree.label(), updated_children) return updated_tree, updated_tree # Separate entities and non-entities entities = [Tree.convert(subtree) for subtree in tree if has_type(subtree, NodeType.ENT)] non_entities = [subtree for subtree in tree if not has_type(subtree, NodeType.ENT)] # Group node for entities group_label = NodeLabel(NodeType.GROUP, tree.label()) group_node = Tree(group_label, entities) relationship_nodes: list[Tree] = [] for child in non_entities: child_group, child_tree = traverse_tree(child) if child_tree.label() == 'ROOT': # extend relations recursively relationship_nodes.extend(Tree.convert(grandchild) for grandchild in child_tree) if has_type(child_group, NodeType.COLL): # Create relationships with each element in the collection for element in child_group: rel_label = NodeLabel(NodeType.REL, f'{group_node.label().name}<->{element.label().name}') relationship_nodes.append(Tree(rel_label, [group_node.copy(deep=True), Tree.convert(element)])) else: rel_label = NodeLabel(NodeType.REL, f'{group_node.label().name}<->{child_group.label().name}') relationship_nodes.append(Tree(rel_label, [group_node.copy(deep=True), Tree.convert(child_group)])) # Return the group node and either a tree of relations or just the group if there are no relations return group_node, Tree('ROOT', relationship_nodes) if relationship_nodes else group_node