Source code for architxt.bucket.zodb

from __future__ import annotations

import contextlib
import os
import shutil
import tempfile
import uuid
import weakref
from pathlib import Path
from typing import TYPE_CHECKING, overload

import transaction
import ZODB.config
from BTrees.OOBTree import OOBTree
from transaction.interfaces import AlreadyInTransaction, NoTransaction
from ZODB.Connection import resetCaches
from zodburi import resolve_uri

from architxt.tree import Tree, TreeOID, TreePersistentRef
from architxt.utils import update_url_queries

from . import TreeBucket

if TYPE_CHECKING:
    from collections.abc import Generator, Iterable, Iterator

    from ZODB.Connection import Connection

__all__ = ['ZODBTreeBucket']


[docs] class ZODBTreeBucket(TreeBucket): """ A persistent, scalable container for :py:class:`~architxt.tree.Tree` objects backed by ZODB and RelStorage using SQLite. This container uses `ZODB <https://zodb.org/en/latest/>`_'s :py:class:`~BTrees.OOBTree.OOBTree` internally with Tree OIDs (UUIDs) as keys. The OIDs are stored as raw bytes to optimize storage space. This also enables fast key comparisons as UUID objects do not need to be created during lookups. .. note:: UUIDs are stored as bytes rather than integers, because ZODB only supports integers up to 64 bits, while UUIDs require 128 bits. If no storage is specified, the bucket use a temporary database that is automatically deleted upon closing. The bucket is serializable so it can be passed to a subprocess. However, when a temporary database is used, the original bucket remains responsible for cleanup. This means the original bucket must stay open for the subprocess to access the database safely. >>> from architxt.bucket.zodb import ZODBTreeBucket >>> from architxt.tree import Tree >>> bucket = ZODBTreeBucket() >>> tree = Tree.fromstring('(S (NP Alice) (VP (VB like) (NNS apples)))') >>> tree.label 'S' >>> with bucket.transaction(): ... bucket.add(tree) # Add the tree to the bucket >>> len(bucket) 1 >>> tree in bucket True >>> with bucket.transaction(): ... tree.label = 'ROOT' # Modify the tree within a transaction >>> tree.label 'ROOT' >>> bucket[tree.oid].label 'ROOT' >>> with bucket.transaction(): ... tree.label = 'S' ... raise ValueError("rollback") # Transaction are rolled back on exception Traceback (most recent call last): ... ValueError: rollback >>> tree.label 'ROOT' >>> with bucket.transaction(): ... bucket.discard(tree) >>> len(bucket) 0 >>> tree in bucket False >>> bucket.close() """ _db: ZODB.DB _connection: Connection _cleanup: bool = False def __init__( self, storage_path: Path | None = None, uri: str | None = None, bucket_name: str = 'architxt', read_only: bool = False, ) -> None: """ Initialize the bucket and connect to the underlying ZODB storage. Either a `uri` or a `storage_path` may be provided. If both are given, the `uri` takes precedence. If neither is provided, a temporary local database is created. Local databases (temporary or using `storage_path`) use a SQLite backend to support multiprocess concurrency. File-based backends are technically possible but not recommended, as they can break parallel execution. For large datasets or in a multi host setup, prefer a relational backend via a ZODB URI. For example, to use PostgreSQL, specify a URI such as: ``postgresql://user:password@localhost/dbname``. :param storage_path: Path to the storage directory for local storage. :param uri: ZODB URI string defining the storage backend. :param bucket_name: Name of the root key under which the internal OOBTree is stored. :param read_only: Whether to open the database in read-only mode. """ self._storage_path = storage_path self._uri = uri self._bucket_name = bucket_name self._read_only = read_only self._db = self._get_db() self._connection = self._get_connection() # Check bucket exist or create it root = self._connection.root() if self._bucket_name not in root: if self._read_only: msg = f"Bucket '{self._bucket_name}' does not exist." raise KeyError(msg) with self.transaction(): root[self._bucket_name] = OOBTree() # Add support for fork on supported systems # When the process is fork, the child process should recreate the connection if hasattr(os, "register_at_fork"): weak_self = weakref.ref(self) os.register_at_fork(after_in_child=lambda: (bucket := weak_self()) and bucket._after_fork()) def _get_db(self) -> ZODB.DB: """ Create and configure the ZODB database. URI-based storage takes precedence over local storage path. For local storage, if no path is provided, a temporary directory is created. We use RelStorage with SQLite as the backend for local storage to allow multi-process access. :return: A configured ZODB database instance. """ if self._uri: uri = update_url_queries(self._uri, read_only='true') if self._read_only else self._uri storage_factory, db_options = resolve_uri(uri) storage = storage_factory() return ZODB.DB(storage, **db_options) if self._storage_path is None: if self._read_only: msg = "Cannot open a read-only bucket with no storage path specified." raise ValueError(msg) self._storage_path = Path(tempfile.mkdtemp(prefix='architxt')) self._cleanup = True return ZODB.config.databaseFromString(f""" %import relstorage <zodb main> <relstorage> keep-history false pack-gc true read-only {'true' if self._read_only else 'false'} <sqlite3> data-dir {self._storage_path} <pragmas> synchronous normal foreign_keys off defer_foreign_keys on temp_store memory journal_mode wal busy_timeout 10000 </pragmas> </sqlite3> </relstorage> </zodb> """) def _get_connection(self) -> Connection: transaction_manager = transaction.TransactionManager(explicit=True) return self._db.open(transaction_manager=transaction_manager) @property def _data(self) -> OOBTree: return self._connection.root()[self._bucket_name] def __reduce__(self) -> tuple[type, tuple[Path | None, str | None, str, bool]]: return self.__class__, (self._storage_path, self._uri, self._bucket_name, self._read_only) def _savepoint(self) -> None: self._connection.savepoint() def _after_fork(self) -> None: # We disable database cleanup as it is the responsibility of the parent self._cleanup = False # We also recreate the connection as it inherit the one from the parent process self.sync()
[docs] def close(self) -> None: """ Close the database connection and release associated resources. This will: - Abort any uncommitted transaction. - Close the active database connection. - Clean up temporary storage if one was created. """ with contextlib.suppress(NoTransaction): self._connection.abort(None) self._connection.close() self._db.close() if self._cleanup: # If a temporary directory was used, clean it up shutil.rmtree(self._storage_path)
[docs] @contextlib.contextmanager def transaction(self) -> Generator[None, None, None]: try: with self._connection.transaction_manager: yield except AlreadyInTransaction: yield
[docs] def sync(self) -> None: """ Synchronize the in-memory state of this bucket with its underlying storage. It clears the local cache and refresh the connection. This can be used to avoid connection timeout in long-running process. """ resetCaches() # We need to refresh the connection to apply cache reset self._connection.close() self._db.pool.clear() # <= By default, connection are reused, this ensure we create a fresh connection self._connection = self._get_connection()
def _update(self, trees: Iterable[Tree], commit: bool) -> None: if commit: with self.transaction(): self._data.update({tree.oid.bytes: tree for tree in trees}) else: self._data.update({tree.oid.bytes: tree for tree in trees}) self._savepoint()
[docs] def add(self, tree: Tree) -> None: """Add a single :py:class:`~architxt.tree.Tree` to the bucket.""" self._data[tree.oid.bytes] = tree
[docs] def discard(self, tree: Tree) -> None: """Remove a :py:class:`~architxt.tree.Tree` from the bucket if it exists.""" self._data.pop(tree.oid.bytes, None)
[docs] def clear(self) -> None: """Remove all :py:class:`~architxt.tree.Tree` objects from the bucket.""" self._data.clear() self._savepoint()
[docs] def oids(self) -> Generator[TreeOID, None, None]: for key in self._data: yield uuid.UUID(bytes=key)
[docs] def get_persistent_ref(self, tree: Tree) -> TreePersistentRef: if ( hasattr(tree, '_p_oid') and (ref := getattr(tree, '_p_oid')) is not None and self.resolve_ref(ref) is not None ): return ref msg = "The given tree is not stored in the bucket." raise KeyError(msg)
[docs] def resolve_ref(self, ref: TreePersistentRef) -> Tree: msg = "The given tree is not stored in the bucket." try: tree = self._connection.get(ref) except Exception as error: raise KeyError(msg) from error if tree is None or tree.root not in self: raise KeyError(msg) return tree
@overload def __getitem__(self, key: TreeOID) -> Tree: ... @overload def __getitem__(self, key: Iterable[TreeOID]) -> Iterable[Tree]: ... def __getitem__(self, key: TreeOID | Iterable[TreeOID]) -> Tree | Iterable[Tree]: if isinstance(key, uuid.UUID): return self._data[key.bytes] return (self._data[oid.bytes] for oid in key) def __contains__(self, item: object) -> bool: if isinstance(item, Tree): return item.oid.bytes in self._data if isinstance(item, uuid.UUID): return item.bytes in self._data return False def __len__(self) -> int: return len(self._data) def __iter__(self) -> Iterator[Tree]: return iter(self._data.values())