You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

523 lines
21 KiB

"""This module implements decorators for implementing other decorators
as well as some commonly used decorators.
"""
import sys
from functools import partial
from inspect import isclass, signature
from threading import Lock, RLock
from .__wrapt__ import BoundFunctionWrapper, CallableObjectProxy, FunctionWrapper
from .arguments import formatargspec
# Adapter wrapper for the wrapped function which will overlay certain
# properties from the adapter function onto the wrapped function so that
# functions such as inspect.getfullargspec(), inspect.signature() and
# inspect.getsource() return the correct results one would expect.
class _AdapterFunctionCode(CallableObjectProxy):
def __init__(self, wrapped_code, adapter_code):
super(_AdapterFunctionCode, self).__init__(wrapped_code)
self._self_adapter_code = adapter_code
@property
def co_argcount(self):
return self._self_adapter_code.co_argcount
@property
def co_code(self):
return self._self_adapter_code.co_code
@property
def co_flags(self):
return self._self_adapter_code.co_flags
@property
def co_kwonlyargcount(self):
return self._self_adapter_code.co_kwonlyargcount
@property
def co_varnames(self):
return self._self_adapter_code.co_varnames
class _AdapterFunctionSurrogate(CallableObjectProxy):
def __init__(self, wrapped, adapter):
super(_AdapterFunctionSurrogate, self).__init__(wrapped)
self._self_adapter = adapter
@property
def __code__(self):
return _AdapterFunctionCode(
self.__wrapped__.__code__, self._self_adapter.__code__
)
@property
def __defaults__(self):
return self._self_adapter.__defaults__
@property
def __kwdefaults__(self):
return self._self_adapter.__kwdefaults__
@property
def __signature__(self):
if "signature" not in globals():
return self._self_adapter.__signature__
else:
return signature(self._self_adapter)
class _BoundAdapterWrapper(BoundFunctionWrapper):
@property
def __func__(self):
return _AdapterFunctionSurrogate(
self.__wrapped__.__func__, self._self_parent._self_adapter
)
@property
def __signature__(self):
if "signature" not in globals():
return self.__wrapped__.__signature__
else:
return signature(self._self_parent._self_adapter)
class AdapterWrapper(FunctionWrapper):
__bound_function_wrapper__ = _BoundAdapterWrapper
def __init__(self, *args, **kwargs):
adapter = kwargs.pop("adapter")
super(AdapterWrapper, self).__init__(*args, **kwargs)
self._self_surrogate = _AdapterFunctionSurrogate(self.__wrapped__, adapter)
self._self_adapter = adapter
@property
def __code__(self):
return self._self_surrogate.__code__
@property
def __defaults__(self):
return self._self_surrogate.__defaults__
@property
def __kwdefaults__(self):
return self._self_surrogate.__kwdefaults__
@property
def __signature__(self):
return self._self_surrogate.__signature__
class AdapterFactory:
def __call__(self, wrapped):
raise NotImplementedError()
class DelegatedAdapterFactory(AdapterFactory):
def __init__(self, factory):
super(DelegatedAdapterFactory, self).__init__()
self.factory = factory
def __call__(self, wrapped):
return self.factory(wrapped)
adapter_factory = DelegatedAdapterFactory
# Decorator for creating other decorators. This decorator and the
# wrappers which they use are designed to properly preserve any name
# attributes, function signatures etc, in addition to the wrappers
# themselves acting like a transparent proxy for the original wrapped
# function so the wrapper is effectively indistinguishable from the
# original wrapped function.
def decorator(wrapper=None, /, *, enabled=None, adapter=None, proxy=FunctionWrapper):
"""
The decorator should be supplied with a single positional argument
which is the `wrapper` function to be used to implement the
decorator. This may be preceded by a step whereby the keyword
arguments are supplied to customise the behaviour of the
decorator. The `adapter` argument is used to optionally denote a
separate function which is notionally used by an adapter
decorator. In that case parts of the function `__code__` and
`__defaults__` attributes are used from the adapter function
rather than those of the wrapped function. This allows for the
argument specification from `inspect.getfullargspec()` and similar
functions to be overridden with a prototype for a different
function than what was wrapped. The `enabled` argument provides a
way to enable/disable the use of the decorator. If the type of
`enabled` is a boolean, then it is evaluated immediately and the
wrapper not even applied if it is `False`. If not a boolean, it will
be evaluated when the wrapper is called for an unbound wrapper,
and when binding occurs for a bound wrapper. When being evaluated,
if `enabled` is callable it will be called to obtain the value to
be checked. If `False`, the wrapper will not be called and instead
the original wrapped function will be called directly instead.
The `proxy` argument provides a way of passing a custom version of
the `FunctionWrapper` class used in decorating the function.
"""
if wrapper is not None:
# Helper function for creating wrapper of the appropriate
# time when we need it down below.
def _build(wrapped, wrapper, enabled=None, adapter=None):
if adapter:
if isinstance(adapter, AdapterFactory):
adapter = adapter(wrapped)
if not callable(adapter):
ns = {}
# Check if the signature argument specification has
# annotations. If it does then we need to remember
# it but also drop it when attempting to manufacture
# a standin adapter function. This is necessary else
# it will try and look up any types referenced in
# the annotations in the empty namespace we use,
# which will fail.
annotations = {}
if not isinstance(adapter, str):
if len(adapter) == 7:
annotations = adapter[-1]
adapter = adapter[:-1]
adapter = formatargspec(*adapter)
exec(f"def adapter{adapter}: pass", ns, ns)
adapter = ns["adapter"]
# Override the annotations for the manufactured
# adapter function so they match the original
# adapter signature argument specification.
if annotations:
adapter.__annotations__ = annotations
return AdapterWrapper(
wrapped=wrapped, wrapper=wrapper, enabled=enabled, adapter=adapter
)
return proxy(wrapped=wrapped, wrapper=wrapper, enabled=enabled)
# The wrapper has been provided so return the final decorator.
# The decorator is itself one of our function wrappers so we
# can determine when it is applied to functions, instance methods
# or class methods. This allows us to bind the instance or class
# method so the appropriate self or cls attribute is supplied
# when it is finally called.
def _wrapper(wrapped, instance, args, kwargs):
# We first check for the case where the decorator was applied
# to a class type.
#
# @decorator
# class mydecoratorclass:
# def __init__(self, arg=None):
# self.arg = arg
# def __call__(self, wrapped, instance, args, kwargs):
# return wrapped(*args, **kwargs)
#
# @mydecoratorclass(arg=1)
# def function():
# pass
#
# In this case an instance of the class is to be used as the
# decorator wrapper function. If args was empty at this point,
# then it means that there were optional keyword arguments
# supplied to be used when creating an instance of the class
# to be used as the wrapper function.
if instance is None and isclass(wrapped) and not args:
# We still need to be passed the target function to be
# wrapped as yet, so we need to return a further function
# to be able to capture it.
def _capture(target_wrapped):
# Now have the target function to be wrapped and need
# to create an instance of the class which is to act
# as the decorator wrapper function. Before we do that,
# we need to first check that use of the decorator
# hadn't been disabled by a simple boolean. If it was,
# the target function to be wrapped is returned instead.
_enabled = enabled
if type(_enabled) is bool:
if not _enabled:
return target_wrapped
_enabled = None
# Now create an instance of the class which is to act
# as the decorator wrapper function. Any arguments had
# to be supplied as keyword only arguments so that is
# all we pass when creating it.
target_wrapper = wrapped(**kwargs)
# Finally build the wrapper itself and return it.
return _build(target_wrapped, target_wrapper, _enabled, adapter)
return _capture
# We should always have the target function to be wrapped at
# this point as the first (and only) value in args.
target_wrapped = args[0]
# Need to now check that use of the decorator hadn't been
# disabled by a simple boolean. If it was, then target
# function to be wrapped is returned instead.
_enabled = enabled
if type(_enabled) is bool:
if not _enabled:
return target_wrapped
_enabled = None
# We now need to build the wrapper, but there are a couple of
# different cases we need to consider.
if instance is None:
if isclass(wrapped):
# In this case the decorator was applied to a class
# type but optional keyword arguments were not supplied
# for initialising an instance of the class to be used
# as the decorator wrapper function.
#
# @decorator
# class mydecoratorclass:
# def __init__(self, arg=None):
# self.arg = arg
# def __call__(self, wrapped, instance,
# args, kwargs):
# return wrapped(*args, **kwargs)
#
# @mydecoratorclass
# def function():
# pass
#
# We still need to create an instance of the class to
# be used as the decorator wrapper function, but no
# arguments are pass.
target_wrapper = wrapped()
else:
# In this case the decorator was applied to a normal
# function, or possibly a static method of a class.
#
# @decorator
# def mydecoratorfuntion(wrapped, instance,
# args, kwargs):
# return wrapped(*args, **kwargs)
#
# @mydecoratorfunction
# def function():
# pass
#
# That normal function becomes the decorator wrapper
# function.
target_wrapper = wrapper
else:
if isclass(instance):
# In this case the decorator was applied to a class
# method.
#
# class myclass:
# @decorator
# @classmethod
# def decoratorclassmethod(cls, wrapped,
# instance, args, kwargs):
# return wrapped(*args, **kwargs)
#
# instance = myclass()
#
# @instance.decoratorclassmethod
# def function():
# pass
#
# This one is a bit strange because binding was actually
# performed on the wrapper created by our decorator
# factory. We need to apply that binding to the decorator
# wrapper function that the decorator factory
# was applied to.
target_wrapper = wrapper.__get__(None, instance)
else:
# In this case the decorator was applied to an instance
# method.
#
# class myclass:
# @decorator
# def decoratorclassmethod(self, wrapped,
# instance, args, kwargs):
# return wrapped(*args, **kwargs)
#
# instance = myclass()
#
# @instance.decoratorclassmethod
# def function():
# pass
#
# This one is a bit strange because binding was actually
# performed on the wrapper created by our decorator
# factory. We need to apply that binding to the decorator
# wrapper function that the decorator factory
# was applied to.
target_wrapper = wrapper.__get__(instance, type(instance))
# Finally build the wrapper itself and return it.
return _build(target_wrapped, target_wrapper, _enabled, adapter)
# We first return our magic function wrapper here so we can
# determine in what context the decorator factory was used. In
# other words, it is itself a universal decorator. The decorator
# function is used as the adapter so that linters see a signature
# corresponding to the decorator and not the wrapper it is being
# applied to.
return _build(wrapper, _wrapper, adapter=decorator)
else:
# The wrapper still has not been provided, so we are just
# collecting the optional keyword arguments. Return the
# decorator again wrapped in a partial using the collected
# arguments.
return partial(decorator, enabled=enabled, adapter=adapter, proxy=proxy)
# Decorator for implementing thread synchronization. It can be used as a
# decorator, in which case the synchronization context is determined by
# what type of function is wrapped, or it can also be used as a context
# manager, where the user needs to supply the correct synchronization
# context. It is also possible to supply an object which appears to be a
# synchronization primitive of some sort, by virtue of having release()
# and acquire() methods. In that case that will be used directly as the
# synchronization primitive without creating a separate lock against the
# derived or supplied context.
def synchronized(wrapped):
"""Depending on the nature of the `wrapped` object, will either return a
decorator which can be used to wrap a function or method, or a context
manager, both of which will act accordingly depending on how used, to
synchronize access to calling of the wrapped function, or the block of
code within the context manager. If it is an object which is a
synchronization primitive, such as a threading Lock, RLock, Semaphore,
Condition, or Event, then it is assumed that the object is to be used
directly as the synchronization primitive, otherwise a lock is created
automatically and attached to the wrapped object and used as the
synchronization primitive.
"""
# Determine if being passed an object which is a synchronization
# primitive. We can't check by type for Lock, RLock, Semaphore etc,
# as the means of creating them isn't the type. Therefore use the
# existence of acquire() and release() methods. This is more
# extensible anyway as it allows custom synchronization mechanisms.
if hasattr(wrapped, "acquire") and hasattr(wrapped, "release"):
# We remember what the original lock is and then return a new
# decorator which accesses and locks it. When returning the new
# decorator we wrap it with an object proxy so we can override
# the context manager methods in case it is being used to wrap
# synchronized statements with a 'with' statement.
lock = wrapped
@decorator
def _synchronized(wrapped, instance, args, kwargs):
# Execute the wrapped function while the original supplied
# lock is held.
with lock:
return wrapped(*args, **kwargs)
class _PartialDecorator(CallableObjectProxy):
def __enter__(self):
lock.acquire()
return lock
def __exit__(self, *args):
lock.release()
return _PartialDecorator(wrapped=_synchronized)
# Following only apply when the lock is being created automatically
# based on the context of what was supplied. In this case we supply
# a final decorator, but need to use FunctionWrapper directly as we
# want to derive from it to add context manager methods in case it is
# being used to wrap synchronized statements with a 'with' statement.
def _synchronized_lock(context):
# Attempt to retrieve the lock for the specific context.
lock = vars(context).get("_synchronized_lock", None)
if lock is None:
# There is no existing lock defined for the context we
# are dealing with so we need to create one. This needs
# to be done in a way to guarantee there is only one
# created, even if multiple threads try and create it at
# the same time. We can't always use the setdefault()
# method on the __dict__ for the context. This is the
# case where the context is a class, as __dict__ is
# actually a dictproxy. What we therefore do is use a
# meta lock on this wrapper itself, to control the
# creation and assignment of the lock attribute against
# the context.
with synchronized._synchronized_meta_lock:
# We need to check again for whether the lock we want
# exists in case two threads were trying to create it
# at the same time and were competing to create the
# meta lock.
lock = vars(context).get("_synchronized_lock", None)
if lock is None:
lock = RLock()
setattr(context, "_synchronized_lock", lock)
return lock
def _synchronized_wrapper(wrapped, instance, args, kwargs):
# Execute the wrapped function while the lock for the
# desired context is held. If instance is None then the
# wrapped function is used as the context.
with _synchronized_lock(instance if instance is not None else wrapped):
return wrapped(*args, **kwargs)
class _FinalDecorator(FunctionWrapper):
def __enter__(self):
self._self_lock = _synchronized_lock(self.__wrapped__)
self._self_lock.acquire()
return self._self_lock
def __exit__(self, *args):
self._self_lock.release()
return _FinalDecorator(wrapped=wrapped, wrapper=_synchronized_wrapper)
synchronized._synchronized_meta_lock = Lock() # type: ignore[attr-defined]