"""Custom data structures."""
import abc
import collections.abc
import threading
from collections import OrderedDict, UserList
from typing import (
AbstractSet,
Any,
ContextManager,
Dict,
ItemsView,
Iterable,
Iterator,
KeysView,
Mapping,
MutableMapping,
MutableSet,
Set,
Tuple,
TypeVar,
Union,
ValuesView,
cast,
)
from .contexts import nullcontext
try:
from django.utils.functional import LazyObject, LazySettings
except ImportError:
class LazyObject: ... # noqa
class LazySettings: ... # noqa
__all__ = [
'FastUserDict',
'FastUserSet',
'FastUserList',
'LRUCache',
'ManagedUserDict',
'ManagedUserSet',
'AttributeDict',
'AttributeDictMixin',
'DictAttribute',
'force_mapping',
]
T = TypeVar('T')
KT = TypeVar('KT')
VT = TypeVar('VT')
_Setlike = Union[AbstractSet[T], Iterable[T]]
[docs]class FastUserDict(MutableMapping[KT, VT]):
"""Proxy to dict.
Like :class:`collection.UserDict` but reimplements some methods
for better performance when the underlying dictionary is a real dict.
"""
data: MutableMapping[KT, VT]
[docs] @classmethod
def fromkeys(cls,
iterable: Iterable[KT],
value: VT = None) -> 'FastUserSet':
d = cls()
d.update({k: value for k in iterable})
return d
def __getitem__(self, key: KT) -> VT:
if not hasattr(self, '__missing__'):
return self.data[key]
if key in self.data:
return self.data[key]
return self.__missing__(key) # type: ignore
def __setitem__(self, key: KT, value: VT) -> None:
self.data[key] = value
def __delitem__(self, key: KT) -> None:
del self.data[key]
def __len__(self) -> int:
return len(self.data)
def __iter__(self) -> Iterator[KT]:
return iter(self.data)
# Rest is fast versions of generic slow MutableMapping methods.
def __contains__(self, key: KT) -> bool:
return key in self.data
def __repr__(self) -> str:
return repr(self.data)
[docs] def copy(self) -> dict:
return self.data.copy()
[docs] def update(self, *args: Any, **kwargs: Any) -> None:
self.data.update(*args, **kwargs)
[docs] def clear(self) -> None:
self.data.clear()
[docs] def items(self) -> ItemsView:
return cast(ItemsView, self.data.items())
[docs] def keys(self) -> KeysView:
return cast(KeysView, self.data.keys())
[docs] def values(self) -> ValuesView:
return self.data.values()
[docs]class FastUserSet(MutableSet[T]):
"""Proxy to set."""
data: MutableSet[T]
# -- Immutable Methods --
def __and__(self, other: Set) -> Set:
return self.data.__and__(other)
def __contains__(self, key: Any) -> bool:
return self.data.__contains__(key)
def __ge__(self, other: AbstractSet[T]) -> bool:
return self.data.__ge__(other)
def __iter__(self) -> Iterator[T]:
return iter(self.data)
def __le__(self, other: AbstractSet[T]) -> bool:
return self.data.__le__(other)
def __len__(self) -> int:
return len(self.data)
def __or__(self, other: AbstractSet[T]) -> Set[T]:
return self.data.__or__(other)
def __rand__(self, other: AbstractSet[T]) -> Set[T]:
return self.__and__(other)
def __reduce__(self) -> tuple:
return self.data.__reduce__()
def __reduce_ex__(self, protocol: Any) -> tuple:
return self.data.__reduce_ex__(protocol)
def __repr__(self) -> str:
return repr(self.data)
def __ror__(self, other: AbstractSet[T]) -> Set[T]:
return self.data.__ror__(other)
def __rsub__(self, other: AbstractSet[T]) -> Set[T]:
return other.__rsub__(self.data)
def __rxor__(self, other: AbstractSet[T]) -> Set[T]:
return other.__rxor__(self.data)
def __sizeof__(self) -> int:
return self.data.__sizeof__()
def __str__(self) -> str:
return str(self.data)
def __sub__(self, other: AbstractSet[T]) -> Set[T]:
return self.data.__sub__(other)
def __xor__(self, other: AbstractSet[T]) -> Set[T]:
return self.data.__xor__(other)
[docs] def copy(self) -> Set[T]:
return self.data.copy()
[docs] def difference(self, other: _Setlike[T]) -> Set[T]:
return self.data.difference(other)
[docs] def intersection(self, other: _Setlike[T]) -> Set[T]:
return self.data.intersection(other)
[docs] def isdisjoint(self, other: AbstractSet[T]) -> bool:
return self.data.isdisjoint(other)
[docs] def issubset(self, other: AbstractSet[T]) -> bool:
return self.data.issubset(other)
[docs] def issuperset(self, other: AbstractSet[T]) -> bool:
return self.data.issuperset(other)
[docs] def symmetric_difference(self, other: _Setlike[T]) -> Set[T]:
return self.data.symmetric_difference(other)
[docs] def union(self, other: _Setlike[T]) -> Set[T]:
return self.data.union(other)
# -- Mutable Methods --
def __iand__(self, other: AbstractSet[T]) -> 'FastUserSet':
self.data.__iand__(other)
return self
def __ior__(self, other: AbstractSet[T]) -> 'FastUserSet':
self.data.__ior__(other)
return self
def __isub__(self, other: AbstractSet[T]) -> 'FastUserSet':
self.data.__isub__(other)
return self
def __ixor__(self, other: AbstractSet[T]) -> 'FastUserSet':
self.data.__ixor__(other)
return self
[docs] def add(self, element: T) -> None:
self.data.add(element)
[docs] def clear(self) -> None:
self.data.clear()
[docs] def difference_update(self, other: _Setlike[T]) -> None:
self.data.difference_update(other)
[docs] def discard(self, element: T) -> None:
self.data.discard(element)
[docs] def intersection_update(self, other: _Setlike[T]) -> None:
self.data.intersection_update(other)
[docs] def pop(self) -> T:
return self.data.pop()
[docs] def remove(self, element: T) -> None:
self.data.remove(element)
[docs] def symmetric_difference_update(self, other: _Setlike[T]) -> None:
self.data.symmetric_difference_update(other)
[docs] def update(self, other: _Setlike[T]) -> None:
self.data.update(other)
[docs]class FastUserList(UserList):
"""Proxy to list."""
class MappingViewProxy(abc.ABC):
@abc.abstractmethod
def _keys(self) -> Iterable[KT]:
...
@abc.abstractmethod
def _values(self) -> Iterable[VT]:
...
@abc.abstractmethod
def _items(self) -> Iterable[Tuple[KT, VT]]:
...
class ProxyKeysView(KeysView):
def __init__(self, mapping: MappingViewProxy) -> None:
self._mapping = mapping
def __iter__(self) -> Iterator[KT]:
return self._mapping._keys()
class ProxyValuesView(ValuesView):
def __init__(self, mapping: MappingViewProxy) -> None:
self._mapping = mapping
def __iter__(self) -> Iterator[VT]:
yield from self._mapping._values()
class ProxyItemsView(ItemsView):
def __init__(self, mapping: MappingViewProxy) -> None:
self._mapping = mapping
def __iter__(self) -> Iterator[Tuple[KT, VT]]:
yield from self._mapping._items()
[docs]class LRUCache(FastUserDict, MutableMapping[KT, VT], MappingViewProxy):
"""LRU Cache implementation using a doubly linked list to track access.
Arguments:
limit (int): The maximum number of keys to keep in the cache.
When a new key is inserted and the limit has been exceeded,
the *Least Recently Used* key will be discarded from the
cache.
thread_safety (bool): Enable if multiple OS threads are going
to access/mutate the cache.
"""
limit: int
thread_safety: bool
_mutex: ContextManager
data: OrderedDict
def __init__(self, limit: int = None,
*,
thread_safety: bool = False) -> None:
self.limit = limit
self.thread_safety = thread_safety
self._mutex = self._new_lock()
self.data: OrderedDict = OrderedDict()
def __getitem__(self, key: KT) -> VT:
with self._mutex:
value = self[key] = self.data.pop(key)
return value
[docs] def update(self, *args: Any, **kwargs: Any) -> None:
with self._mutex:
data, limit = self.data, self.limit
data.update(*args, **kwargs)
if limit and len(data) > limit:
# pop additional items in case limit exceeded
for _ in range(len(data) - limit):
data.popitem(last=False)
[docs] def popitem(self, *, last: bool = True) -> Tuple[KT, VT]:
with self._mutex:
return self.data.popitem(last)
def __setitem__(self, key: KT, value: VT) -> None:
# remove least recently used key.
with self._mutex:
if self.limit and len(self.data) >= self.limit:
self.data.pop(next(iter(self.data)))
self.data[key] = value
def __iter__(self) -> Iterator:
return iter(self.data)
[docs] def keys(self) -> KeysView[KT]:
return ProxyKeysView(self)
def _keys(self) -> Iterator[KT]:
# userdict.keys in py3k calls __getitem__
with self._mutex:
yield from self.data.keys()
[docs] def values(self) -> ValuesView[VT]:
return ProxyValuesView(self)
def _values(self) -> Iterator[VT]:
with self._mutex:
for k in self:
try:
yield self.data[k]
except KeyError: # pragma: no cover
pass
[docs] def items(self) -> ItemsView[KT, VT]:
return ProxyItemsView(self)
def _items(self) -> Iterator[Tuple[KT, VT]]:
with self._mutex:
for k in self:
try:
yield (k, self.data[k])
except KeyError: # pragma: no cover
pass
[docs] def incr(self, key: KT, delta: int = 1) -> int:
with self._mutex:
# this acts as memcached does- store as a string, but return a
# integer as long as it exists and we can cast it
newval = int(self.data.pop(key)) + delta
self[key] = cast(VT, str(newval))
return newval
def _new_lock(self) -> ContextManager:
if self.thread_safety:
return cast(ContextManager, threading.RLock())
return nullcontext()
def __getstate__(self) -> Mapping[str, Any]:
d = dict(vars(self))
d.pop('_mutex')
return d
def __setstate__(self, state: Dict[str, Any]) -> None:
self.__dict__ = state
self._mutex = self._new_lock()
[docs]class ManagedUserSet(FastUserSet[T]):
"""A MutableSet that adds callbacks for when keys are get/set/deleted."""
[docs] def on_add(self, value: T) -> None:
...
[docs] def on_discard(self, value: T) -> None:
...
[docs] def on_clear(self) -> None:
...
[docs] def on_change(self, added: Set[T], removed: Set[T]) -> None:
...
[docs] def add(self, element: T) -> None:
if element not in self.data:
self.on_add(element)
self.data.add(element)
[docs] def clear(self) -> None:
self.on_clear()
self.data.clear()
[docs] def discard(self, element: T) -> None:
if element in self.data:
self.on_discard(element)
self.data.discard(element)
[docs] def pop(self) -> T:
element = self.data.pop()
self.on_discard(element)
return element
[docs] def raw_update(self, *args: Any, **kwargs: Any) -> None:
self.data.update(*args, **kwargs)
def __iand__(self, other: AbstractSet[T]) -> 'FastUserSet':
self.on_change(added=set(), removed=self.difference(other))
self.data.__iand__(other)
return self
def __ior__(self, other: AbstractSet[T]) -> 'FastUserSet':
self.on_change(added=other.difference(self), removed=set())
self.data.__ior__(other)
return self
def __isub__(self, other: AbstractSet[T]) -> 'FastUserSet':
self.on_change(added=set(), removed=self.data.intersection(other))
self.data.__isub__(other)
return self
def __ixor__(self, other: AbstractSet[T]) -> 'FastUserSet':
self.on_change(
added=other.difference(self.data),
removed=self.data.intersection(other),
)
self.data.__ixor__(other)
return self
[docs] def difference_update(self, other: _Setlike[T]) -> None:
self.on_change(added=set(), removed=self.data.intersection(other))
self.data.difference_update(other)
[docs] def intersection_update(self, other: _Setlike[T]) -> None:
self.on_change(added=set(), removed=self.difference(other))
self.data.intersection_update(other)
[docs] def symmetric_difference_update(self, other: _Setlike[T]) -> None:
self.on_change(
added=other.difference(self.data),
removed=self.data.intersection(other),
)
self.data.symmetric_difference_update(other)
[docs] def update(self, other: _Setlike[T]) -> None:
# union update
self.on_change(added=other.difference(self), removed=set())
self.data.update(other)
[docs]class ManagedUserDict(FastUserDict[KT, VT]):
"""A UserDict that adds callbacks for when keys are get/set/deleted."""
[docs] def on_key_get(self, key: KT) -> None:
"""Handle that key is being retrieved."""
...
[docs] def on_key_set(self, key: KT, value: VT) -> None:
"""Handle that value for a key is being set."""
...
[docs] def on_key_del(self, key: KT) -> None:
"""Handle that a key is deleted."""
...
[docs] def on_clear(self) -> None:
"""Handle that the mapping is being cleared."""
...
def __getitem__(self, key: KT) -> Any:
self.on_key_get(key)
return super().__getitem__(key)
def __setitem__(self, key: KT, value: VT) -> None:
self.on_key_set(key, value)
self.data[key] = value
def __delitem__(self, key: KT) -> None:
self.on_key_del(key)
del self.data[key]
[docs] def update(self, *args: Any, **kwargs: Any) -> None:
for d in args:
for key, value in d.items():
self.on_key_set(key, value)
for key, value in kwargs.items():
self.on_key_set(key, value)
self.data.update(*args, **kwargs)
[docs] def clear(self) -> None:
self.on_clear()
self.data.clear()
[docs] def raw_update(self, *args: Any, **kwargs: Any) -> None:
self.data.update(*args, **kwargs)
[docs]class AttributeDictMixin:
"""Mixin for Mapping interface that adds attribute access.
I.e., `d.key -> d[key]`).
"""
def __getattr__(self, k: str) -> Any:
"""`d.key -> d[key]`."""
try:
return self[k]
except KeyError:
raise AttributeError(
f'{type(self).__name__!r} object has no attribute {k!r}')
def __setattr__(self, key: str, value: Any) -> None:
"""`d[key] = value -> d.key = value`."""
self[key] = value
[docs]class AttributeDict(dict, AttributeDictMixin):
"""Dict subclass with attribute access."""
[docs]class DictAttribute(MutableMapping[KT, VT], MappingViewProxy):
"""Dict interface to attributes.
`obj[k] -> obj.k`
`obj[k] = val -> obj.k = val`
"""
obj: Any = None
def __init__(self, obj: Any) -> None:
object.__setattr__(self, 'obj', obj)
def __getattr__(self, key: Any) -> Any:
return getattr(self.obj, key)
def __setattr__(self, key: Any, value: Any) -> None:
return setattr(self.obj, key, value)
[docs] def get(self, key: Any, default: Any = None) -> Any:
try:
return self[key]
except KeyError:
return default
[docs] def setdefault(self, key: Any, default: Any = None) -> Any:
if key in self:
return self[key]
self[key] = default
return default
def __getitem__(self, key: Any) -> Any:
try:
return getattr(self.obj, key)
except AttributeError:
raise KeyError(key)
def __setitem__(self, key: Any, value: Any) -> None:
setattr(self.obj, key, value)
def __delitem__(self, key: Any) -> None:
raise NotImplementedError()
def __len__(self) -> int:
return len(self.obj.__dict__)
def __contains__(self, key: Any) -> bool:
return hasattr(self.obj, key)
def __iter__(self) -> Iterable[KT]:
return self._keys()
def _keys(self) -> Iterator[KT]:
for key in dir(self.obj):
yield key
def _values(self) -> Iterator[VT]:
obj = self.obj
for key in self:
yield getattr(obj, key)
def _items(self) -> Iterator[Tuple[KT, VT]]:
obj = self.obj
for key in self:
yield key, getattr(obj, key)
collections.abc.MutableMapping.register(DictAttribute) # noqa: E305
[docs]def force_mapping(m: Any) -> Mapping:
"""Wrap object into supporting the mapping interface if necessary."""
if isinstance(m, (LazyObject, LazySettings)):
m = m._wrapped
return DictAttribute(m) if not isinstance(m, Mapping) else m