"""Importing utilities."""
import importlib
import os
import sys
import warnings
from contextlib import contextmanager, suppress
from types import ModuleType
from typing import (
Any,
Callable,
Generator,
Generic,
Iterable,
Iterator,
Mapping,
MutableMapping,
NamedTuple,
Set,
Type,
TypeVar,
Union,
cast,
)
from .collections import FastUserDict
from .objects import cached_property
from .text import didyoumean
try:
from yarl import URL
except ImportError: # pragma: no cover
class URL:
def __init__(self, url: str) -> None:
assert '://' in url
self.scheme = url.split('://')[0]
# - these are taken from kombu.utils.imports
__all__ = [
'FactoryMapping',
'SymbolArg',
'symbol_by_name',
'load_extension_class_names',
'load_extension_classes',
'cwd_in_path',
'import_from_cwd',
'smart_import',
]
_T = TypeVar('_T')
_T_contra = TypeVar('_T_contra', contravariant=True)
SymbolArg = Union[_T, str]
[docs]class FactoryMapping(FastUserDict, Generic[_T]):
"""Class plugin system.
This is an utility to maintain a mapping from name to fully
qualified Python attribute path, and also supporting the use
of these in URLs.
Example:
>>> # Specifying the type enables mypy to know that
>>> # this factory returns Driver subclasses.
>>> drivers: FactoryMapping[Type[Driver]]
>>> drivers = FactoryMapping({
... 'rabbitmq': 'my.drivers.rabbitmq:Driver',
... 'kafka': 'my.drivers.kafka:Driver',
... 'redis': 'my.drivers.redis:Driver',
... })
>>> drivers.by_url('rabbitmq://localhost:9090')
<class 'my.drivers.rabbitmq.Driver'>
>>> drivers.by_name('redis')
<class 'my.drivers.redis.Driver'>
"""
aliases: MutableMapping[str, str]
namespaces: Set
_finalized: bool = False
def __init__(self, *args: Mapping, **kwargs: str) -> None:
self.aliases = dict(*args, **kwargs) # type: ignore
self.namespaces = set()
[docs] def iterate(self) -> Iterator[_T]:
self._maybe_finalize()
for name in self.aliases:
yield self.by_name(name)
[docs] def by_url(self, url: Union[str, URL]) -> _T:
"""Get class associated with URL (scheme is used as alias key)."""
# we remove anything after ; so urlparse can recognize the url.
return self.by_name(URL(url).scheme)
[docs] def by_name(self, name: SymbolArg[_T_contra]) -> _T:
self._maybe_finalize()
try:
return symbol_by_name(name, aliases=self.aliases)
except ModuleNotFoundError as exc:
name_ = cast(str, name)
if '.' in name_:
raise
alt = didyoumean(
self.aliases, name_,
fmt_none=f'Available choices: {", ".join(self.aliases)}')
raise ModuleNotFoundError(
f'{name!r} is not a valid name. {alt}') from exc
[docs] def get_alias(self, name: str) -> str:
self._maybe_finalize()
return self.aliases[name]
def _maybe_finalize(self) -> None:
if not self._finalized:
self._finalized = True
self._finalize()
def _finalize(self) -> None:
for namespace in self.namespaces:
self.aliases.update({
name: cls_name
for name, cls_name in load_extension_class_names(namespace)
})
[docs] @cached_property
def data(self) -> MutableMapping: # type: ignore
return self.aliases
def _ensure_identifier(path: str, full: str) -> None:
for part in path.split('.'):
if not part.isidentifier():
raise ValueError(
f'Component {part!r} of {full!r} is not a valid identifier')
[docs]def symbol_by_name(
name: SymbolArg,
aliases: Mapping[str, str] = None,
imp: Any = None,
package: str = None,
sep: str = '.',
default: Any = None,
**kwargs: Any) -> Any:
"""Get symbol by qualified name.
The name should be the full dot-separated path to the class::
modulename.ClassName
Example::
mazecache.backends.redis.RedisBackend
^- class name
or using ':' to separate module and symbol::
mazecache.backends.redis:RedisBackend
If `aliases` is provided, a dict containing short name/long name
mappings, the name is looked up in the aliases first.
Examples:
>>> symbol_by_name('mazecache.backends.redis:RedisBackend')
<class 'mazecache.backends.redis.RedisBackend'>
>>> symbol_by_name('default', {
... 'default': 'mazecache.backends.redis:RedisBackend'})
<class 'mazecache.backends.redis.RedisBackend'>
# Does not try to look up non-string names.
>>> from mazecache.backends.redis import RedisBackend
>>> symbol_by_name(RedisBackend) is RedisBackend
True
"""
# This code was copied from kombu.utils.symbol_by_name
imp = importlib.import_module if imp is None else imp
if not isinstance(name, str):
return name # already a class
name = (aliases or {}).get(name) or name
sep = ':' if ':' in name else sep
module_name, _, attr = name.rpartition(sep)
if not module_name:
attr, module_name = None, package if package else attr
if attr:
_ensure_identifier(attr, full=name)
if module_name: # pragma: no cover
_ensure_identifier(module_name, full=name)
try:
try:
module = imp(module_name, package=package, **kwargs)
except ValueError as exc:
raise ValueError(
f'Cannot import {name!r}: {exc}',
).with_traceback(sys.exc_info()[2])
return getattr(module, attr) if attr else module
except (ImportError, AttributeError):
if default is None:
raise
return default
class EntrypointExtension(NamedTuple):
name: str
type: Type
class RawEntrypointExtension(NamedTuple):
name: str
target: str
[docs]def load_extension_classes(namespace: str) -> Iterable[EntrypointExtension]:
"""Yield extension classes for setuptools entrypoint namespace.
If an entrypoint is defined in ``setup.py``::
entry_points={
'faust.codecs': [
'msgpack = faust_msgpack:msgpack',
],
Iterating over the 'faust.codecs' namespace will yield
the actual attributes specified in the path (``faust_msgpack:msgpack``)::
>>> from faust_msgpack import msgpack
>>> attrs = list(load_extension_classes('faust.codecs'))
assert msgpack in attrs
"""
for name, cls_name in load_extension_class_names(namespace):
try:
cls = symbol_by_name(cls_name)
except (ImportError, SyntaxError) as exc:
warnings.warn(
f'Cannot load {namespace} extension {cls_name!r}: {exc!r}')
else:
yield EntrypointExtension(name, cls)
[docs]def load_extension_class_names(
namespace: str) -> Iterable[RawEntrypointExtension]:
"""Get setuptools entrypoint extension class names.
If the entrypoint is defined in ``setup.py`` as::
entry_points={
'faust.codecs': [
'msgpack = faust_msgpack:msgpack',
],
Iterating over the 'faust.codecs' namespace will yield the name::
>>> list(load_extension_class_names('faust.codecs'))
[('msgpack', 'faust_msgpack:msgpack')]
"""
try:
from pkg_resources import iter_entry_points
except ImportError:
return
for ep in iter_entry_points(namespace):
yield RawEntrypointExtension(
ep.name,
':'.join([ep.module_name, ep.attrs[0]]),
)
[docs]@contextmanager
def cwd_in_path() -> Generator:
"""Context adding the current working directory to sys.path."""
cwd = os.getcwd()
if cwd in sys.path:
yield
else:
sys.path.insert(0, cwd)
try:
yield cwd
finally:
with suppress(ValueError):
sys.path.remove(cwd)
[docs]def import_from_cwd(module: str,
*,
imp: Callable = None,
package: str = None) -> ModuleType:
"""Import module, temporarily including modules in the current directory.
Modules located in the current directory has
precedence over modules located in `sys.path`.
"""
if imp is None:
imp = importlib.import_module
with cwd_in_path():
return imp(module, package=package)
[docs]def smart_import(path: str, imp: Any = None) -> Any:
"""Import module if module, otherwise same as :func:`symbol_by_name`."""
imp = importlib.import_module if imp is None else imp
if ':' in path:
# Path includes attribute so can just jump
# here (e.g., ``os.path:abspath``).
return symbol_by_name(path, imp=imp)
# Not sure if path is just a module name or if it includes an
# attribute name (e.g., ``os.path``, vs, ``os.path.abspath``).
try:
return imp(path)
except ImportError:
# Not a module name, so try module + attribute.
return symbol_by_name(path, imp=imp)