Source code for aiocontext.context

"""Context information storage for asyncio."""

import asyncio
from collections import ChainMap
from collections.abc import MutableMapping
from contextlib import suppress
from uuid import uuid4

from .__about__ import __title__
from .errors import EventLoopError, TaskContextError, TaskFactoryError
from .task_factory import get_task_factory_attr


[docs]class Context(MutableMapping): """Create an empty, asynchronous execution context. The context must be attached to an event loop. It behaves like a dictionnary:: >>> context = Context() >>> context.attach(loop) >>> context['key1'] = 'value' >>> context['key1'] 'value' >>> context.get('key2', 'defaut') 'default' Upon initialization, an optional argument *copy_func* can be passed to specify how context data are copied between tasks. Relevant values are: * :class:`dict` or :func:`copy.copy` to create a shallow copy. * :func:`copy.deepcopy` to create a deep copy. All values should support the deepcopy protocol. * :func:`chainmap_copy` to use a :class:`collections.ChainMap`, with child task data stored in the front map and parent data stored in nested maps. """ __slots__ = ( '_copy_func', '_data_attr', ) def __init__(self, copy_func=dict): self._copy_func = copy_func self._data_attr = '_{prefix}_{suffix}'.format( prefix=__title__, suffix=str(uuid4()).replace('-', ''), ) @property def copy_func(self): """Copy function, called when a new task is spawned.""" return self._copy_func
[docs] def get_data(self, task=None): """Return the :class:`dict` of *task* data. If *task* is omitted or ``None``, return data of the task being currently executed. If no task is running, an :exc:`EventLoopError` is raised. This method raises :exc:`TaskContextError` if no context data is stored in *task*. This usually indicates that the context task factory was not set in the event loop. :: >>> context['key'] = 'value' >>> context.get_data() {'key': 'value'} """ if task is None: task = asyncio.Task.current_task() if task is None: raise EventLoopError("No event loop found") data = getattr(task, self._data_attr, None) if data is None: raise TaskContextError("No task context found") return data
def __getitem__(self, key): data = self.get_data() return data[key] def __setitem__(self, key, value): data = self.get_data() data[key] = value def __delitem__(self, key): data = self.get_data() del data[key] def __iter__(self): data = self.get_data() return iter(data) def __len__(self): data = self.get_data() return len(data)
[docs] def attach(self, loop): """Attach the execution context to *loop*. When new tasks are spawned by the loop, they will inherit context data from the parent task. The loop must use a context-aware task factory; if not, a :exc:`TaskFactoryError` is raised. This method has no effect if the context is already attached to *loop*. """ get_task_factory_attr(loop)[self._data_attr] = self
[docs] def detach(self, loop): """Detach the execution context from *loop*. This method has no effect if the context is not attached to *loop*. """ with suppress(KeyError, TaskFactoryError): del get_task_factory_attr(loop)[self._data_attr]
[docs]def chainmap_copy(data): """Context copy function based on :class:`collections.ChainMap`. :: context = Context(copy_func=chainmap_copy) On nested copies, :class:`collections.ChainMap` instances are flattened for efficiency purposes. """ if isinstance(data, ChainMap): return data.new_child() else: return ChainMap({}, data)
[docs]def get_loop_contexts(loop): """Return the list of contexts attached to *loop*. :: >>> context1 = Context() >>> context1.attach(loop) >>> context2 = Context() >>> context2.attach(loop) >>> get_loop_contexts(loop) [<Context object at 0x10>, <Context object at 0x20>] >>> context2.detach(loop) [<Context object at 0x10>] Raises :exc:`TaskFactoryError` if the loop is not context-aware, i.e. the task factory was not set. """ return list(get_task_factory_attr(loop).values())