from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import AsyncIterable, Generator, Iterable, MutableSet
from contextlib import nullcontext
from typing import TYPE_CHECKING, overload
import more_itertools
from aiostream import stream
from typing_extensions import Self
from architxt.tree import Forest, Tree, TreeOID, TreePersistentRef
from architxt.utils import get_commit_batch_size
if TYPE_CHECKING:
from contextlib import AbstractContextManager
from types import TracebackType
__all__ = ['TreeBucket']
[docs]
class TreeBucket(ABC, MutableSet[Tree], Forest):
"""
Abstract base class for a scalable, persistent, transactional container of :py:class:`~architxt.tree.Tree` objects.
``TreeBucket`` behaves like a :py:class:`set` while providing durable storage and explicit transactional
semantics. It is designed for large-scale data and supports millions of trees with bounded
memory usage through batched commits.
**Transaction Management**
- Adding or removing trees requires an active transaction.
- Modifying a tree that is already in a bucket is not possible without a transaction.
The modifications are automatically persisted when the transaction is committed.
- Exceptions inside a ``with bucket.transaction():`` block automatically roll back the transaction.
**Available Implementations**
.. inheritance-diagram:: architxt.bucket.TreeBucket
:include-subclasses:
:parts: 1
"""
def _update(self, trees: Iterable[Tree], commit: bool) -> None:
with self.transaction() if commit else nullcontext():
for tree in trees:
self.add(tree)
[docs]
def update(self, trees: Iterable[Tree], *, commit: bool | int = False) -> None:
"""
Add multiple :py:class:`~architxt.tree.Tree` to the bucket.
:param trees: Trees to add to the bucket.
:param commit: Commit automatically. If already in a transaction, no commit is applied.
- If False (default), no commits are made, it relies on the current transaction.
- If True, commits in batch.
- If an integer, commits every N tree.
To avoid memory issues, we recommend using incremental commit with large iterables.
"""
batch_size = get_commit_batch_size(commit)
for chunk in more_itertools.ichunked(trees, batch_size):
self._update(chunk, bool(commit))
[docs]
async def async_update(self, trees: Iterable[Tree] | AsyncIterable[Tree], *, commit: bool | int = False) -> None:
"""
Asynchronously add multiple :py:class:`~architxt.tree.Tree` to the bucket.
This method mirrors the behavior of :py:meth:`~TreeBucket.update` but supports asynchronous iteration.
:param trees: Trees to add to the bucket.
:param commit: Commit automatically. If already in a transaction, no commit is applied.
- If False (default), no commits are made, it relies on the current transaction.
- If True, commits in batch.
- If an integer, commits every N tree.
To avoid memory issues, we recommend using incremental commit with large iterables.
"""
batch_size = get_commit_batch_size(commit)
async for chunk in stream.chunks(stream.iterate(trees), batch_size):
self._update(chunk, bool(commit))
[docs]
@abstractmethod
def close(self) -> None:
"""Close the underlying storage and release any associated resources."""
[docs]
@abstractmethod
def transaction(self) -> AbstractContextManager[None]:
"""
Return a context manager for managing a transaction.
Upon exiting the context, the transaction is automatically committed.
If an exception occurs within the context, the transaction is rolled back.
Transactions are reentrant.
"""
[docs]
@abstractmethod
def sync(self) -> None:
"""
Synchronize the in-memory state of this bucket with its underlying storage.
Implementations typically flush, refresh and/or invalidate caches and reload metadata so that subsequent
operations reflect external changes. This operation may be expensive, so it should be called sparingly,
but it is often required in concurrent environments (e.g., when using threads or subprocesses).
"""
[docs]
@abstractmethod
def oids(self) -> Generator[TreeOID, None, None]:
"""Yield the object IDs (OIDs) of all trees stored in the bucket."""
[docs]
@abstractmethod
def get_persistent_ref(self, tree: Tree) -> TreePersistentRef:
"""
Get a persistent reference for a given tree.
:param tree: The tree to get the persistent reference for.
:return: The persistent reference of the tree for this bucket.
:raises KeyError: If the tree is not stored in the bucket.
"""
raise NotImplementedError
[docs]
@abstractmethod
def resolve_ref(self, ref: TreePersistentRef) -> Tree:
"""
Resolve a persistent_ref back to a live Tree instance.
:param ref: The value returned by :py:meth:`Tree.persistent_ref`.
:return: The tree corresponding to the given persistent reference.
:raises KeyError: If the tree is not found in the bucket.
"""
raise NotImplementedError
@overload
def __getitem__(self, key: TreeOID) -> Tree: ...
@overload
def __getitem__(self, key: Iterable[TreeOID]) -> Iterable[Tree]: ...
@abstractmethod
def __getitem__(self, key: TreeOID | Iterable[TreeOID]) -> Tree | Iterable[Tree]:
"""
Retrieve one or more :py:class:`~architxt.tree.Tree` by their OID(s).
:param key: A single object ID or a collection of object IDs to retrieve.
:return: A single :py:class:`~architxt.tree.Tree` or a collection of :py:class:`~architxt.tree.Tree` objects.
- bucket[oid] -> tree
- bucket[[oid1, oid2, ...]] -> [tree1, tree2, ...]
"""
def __enter__(self) -> Self:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.close()