Source code for architxt.bucket
from abc import ABC, abstractmethod
from collections.abc import AsyncIterable, Generator, Iterable, MutableSet
from contextlib import AbstractContextManager
from types import TracebackType
from typing import overload
import anyio.to_thread
from aiostream import Stream, stream
from architxt.tree import Forest, Tree, TreeOID
from architxt.utils import BATCH_SIZE
__all__ = ['TreeBucket']
[docs]
class TreeBucket(ABC, MutableSet[Tree], Forest):
"""
A scalable, persistent, set-like container for :py:class:`~architxt.tree.Tree`.
The :py:class:`TreeBucket` behaves like a mutable set and provides persistent storage.
It is designed to handle large collections of trees efficiently,
supporting standard set operations and transactional updates.
**Transaction Management**
- Bucket automatically handles transactions when adding or removing :py:class:`~architxt.tree.Tree` from the bucket.
- If a :py:class:`~architxt.tree.Tree` is modified after being added to the bucket, you must call :py:meth:`~TreeBucket.commit` to persist those changes.
**Available Implementations**
.. inheritance-diagram:: architxt.bucket.TreeBucket architxt.bucket.zodb.ZODBTreeBucket
:top-classes: architxt.bucket.TreeBucket
:parts: 1
"""
[docs]
@abstractmethod
def update(self, trees: Iterable[Tree], batch_size: int = BATCH_SIZE) -> None:
"""
Add multiple :py:class:`~architxt.tree.Tree` to the bucket, managing memory via chunked transactions.
:param trees: Trees to add to the bucket.
:param batch_size: The number of trees to be added at once.
"""
[docs]
async def async_update(self, trees: AsyncIterable[Tree], batch_size: int = BATCH_SIZE) -> None:
"""
Asynchronously add multiple :py:class:`~architxt.tree.Tree` to the bucket.
This method mirrors the behavior of :py:meth:`~TreeBucket.update` but supports asynchronous iteration.
Internally, it delegates each chunk to a background thread.
:param trees: Trees to add to the bucket.
:param batch_size: The number of trees to be added at once.
"""
chunk_stream: Stream[list[Tree]] = stream.chunks(trees, batch_size)
chunk: list[Tree]
async with chunk_stream.stream() as streamer:
async for chunk in streamer:
await anyio.to_thread.run_sync(self.update, chunk)
[docs]
@abstractmethod
def close(self) -> None:
"""Close the underlying storage and release any associated resources."""
[docs]
@abstractmethod
def transaction(self) -> AbstractContextManager[None]:
"""
Return a context manager for managing a transaction.
Upon exiting the context, the transaction is automatically committed.
If an exception occurs within the context, the transaction is rolled back.
"""
[docs]
@abstractmethod
def commit(self) -> None:
"""Persist any in-memory changes to :py:class:`~architxt.tree.Tree` in the bucket."""
[docs]
@abstractmethod
def oids(self) -> Generator[TreeOID, None, None]:
"""Yield the object IDs (OIDs) of all trees stored in the bucket."""
@overload
def __getitem__(self, key: TreeOID) -> Tree: ...
@overload
def __getitem__(self, key: Iterable[TreeOID]) -> Iterable[Tree]: ...
@abstractmethod
def __getitem__(self, key: TreeOID | Iterable[TreeOID]) -> Tree | Iterable[Tree]:
"""
Retrieve one or more :py:class:`~architxt.tree.Tree` by their OID(s).
:param key: A single object ID or a collection of object IDs to retrieve.
:return: A single :py:class:`~architxt.tree.Tree` or a collection of :py:class:`~architxt.tree.Tree` objects.
- bucket[oid] -> tree
- bucket[[oid1, oid2, ...]] -> [tree1, tree2, ...]
"""
def __enter__(self) -> 'TreeBucket':
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.close()