import logging
from abc import ABC, abstractmethod
import inspect
import types
TASKS = {}
RESOURCES = {}
LOG = logging.getLogger('approxeng.task')
[docs]def task(_func=None, *, name=None):
"""
Decorator to indicate that a function is a simple task. The function will be registered, using either the
name if explicitly provided, or the name of the function otherwise.
"""
if _func is not None:
# Called with no name argument
register_task(name=_func.__name__, value=_func)
return _func
else:
# Called with an explicit argument, use this to register it
def decorator(func):
task_name = name if name is not None else func.__name__
register_task(name=task_name, value=func)
return func
return decorator
[docs]def resource(_func=None, *, name=None):
"""
Decorator to indicate that a function produces a resource. If the resource is a static value you should probably
use the register_resource instead.
"""
if _func is not None:
# Called with no name argument
register_resource(_func.__name__, _func)
return _func
else:
# Called with an explicit name argument, use this to register it
def decorator(func):
register_resource(name, func)
return func
return decorator
[docs]class TaskException(Exception):
pass
[docs]class Task(ABC):
"""
Abstract base class for tasks, things which are called repeatedly to perform some higher function.
"""
global_count = 0
[docs] class World:
"""
The world that the task tick sees, consists of all resources the task said it needed.
"""
[docs] def __init__(self, resources, global_count, task_count):
self.dict = {}
for resource_name in resources:
res = RESOURCES[resource_name]
dependency_values = {name: self.dict[name] for name in res.dependencies}
self.dict[resource_name] = res.value(**dependency_values)
self.dict['global_count'] = global_count
self.dict['task_count'] = task_count
def __getitem__(self, item):
if isinstance(item, tuple):
return [self.__getattr__(single_item) for single_item in item]
return self.__getattr__(item)
def __getattr__(self, item: str):
if item in self.dict:
return self.dict[item]
raise AttributeError
def __contains__(self, item):
return item in self.dict
[docs] def __init__(self, name, resources=None):
"""
Create a new task
:param name:
Name used internally by this task
:param resources:
A string, or list of strings, containing names of resources which must be available for this task to
function. Any such resources will be initialised and shutdown alongside the task itself. If this is
set to None then all resources registered will be available, otherwise only those explicitly named here
will be accessible from the task logic.
"""
self._resources = resources
if resources is not None and not isinstance(resources, list):
self._resources = [resources]
self.active = False
self.name = name
self.task_count = 0
self.ordered_resources = None
@property
def resources(self):
"""
A list of names of resources needed by this task, if this was originally set to None then this returns a list
containing all the registered resource names.
"""
if self._resources is None:
return RESOURCES.keys()
return self._resources
[docs] def do_startup(self):
"""
If this task is not currently active, call startup on any required resources, then call startup on the task
implementation. No need to call this explicitly as it's called if the task isn't active on the first tick.
"""
self.ordered_resources = get_resource_total_order(self.resources)
if self.active:
LOG.warning('Task "%s" startup called but task already active', self.name)
else:
LOG.info('Task "%s" starting', self.name)
for task_resource in self.ordered_resources:
if task_resource not in RESOURCES:
raise TaskException('Required resource "{}" not defined'.format(task_resource))
RESOURCES[task_resource].startup()
self.startup()
self.active = True
[docs] def do_shutdown(self):
"""
If this task is active, shut it down, then call shutdown on all resources.
"""
if self.active:
LOG.info('Task "%s" shutting down', self.name)
self.shutdown()
for task_resource in reversed(self.ordered_resources):
RESOURCES[task_resource].shutdown()
self.active = False
[docs] def do_tick(self):
"""
Start up the task, if needed, then call the tick method, passing in the world and tick count.
"""
if not self.active:
self.do_startup()
LOG.debug('Task "%s", task_tick %i, global_tick %i', self.name, self.task_count, Task.global_count)
return_value = self.tick(
world=Task.World(resources=self.ordered_resources,
task_count=self.task_count,
global_count=Task.global_count))
Task.global_count = Task.global_count + 1
self.task_count = self.task_count + 1
return return_value
[docs] @abstractmethod
def startup(self):
"""
Implement to provide startup logic for the task.
"""
pass
[docs] @abstractmethod
def shutdown(self):
"""
Implement to provide shutdown logic for the task.
"""
pass
[docs] @abstractmethod
def tick(self, world):
"""
Called every tick, do your stuff here!
:param world:
A world object, access any required resources through properties on this object.
:return:
Used to signal what to do at the end of the tick:
None - continue, call the same task again
True - exit from the task loop. Probably don't do this, instead...
...String or Task - shut this task down and set the specified task (name or direct reference) to be the
active one. If you want to exit the best way to do so is to delegate to the exit task, or to a custom task
which gracefully shuts down your hardware before exiting from the loop.
"""
pass
[docs]class SimpleTask(Task):
"""
Class that wraps a single task function, including basic state consisting of an initially blank dict that
the task can safely write to and read from for the duration of the run.
The function wrapped by the task can accept any or all of the following parameters:
state : passed the state dict, persists across multiple calls within a task session.
world : passed the world state, consisting of a set of named resources as requested by the task specification.
count : monotonically ascending tick count across the entire application.
"""
[docs] def __init__(self, task_function, name):
"""
Create a new simple task instance, this is generally going to be called from within the library when wrapping
a task function.
:param task_function:
A function which will be called every tick while this task is active. If the function accepts parameters,
these are interpreted as the names of resources required by the task, and provided when the function is
called.
:param name:
The name of the task, this is only really used internally for logging, the canonical name is defined by the
key under which the task is registered.
"""
self.all_args = list(inspect.signature(task_function).parameters.keys())
resources = [res for res in self.all_args if res not in ['task_state', 'task_count', 'global_count']]
super(SimpleTask, self).__init__(resources=resources, name=name)
self.task_function = task_function
self.state = {}
[docs] def startup(self):
"""
Clear the state dict, this should never be needed but doesn't hurt to check
"""
self.state.clear()
[docs] def shutdown(self):
"""
Clear the state dict
"""
self.state.clear()
[docs] def tick(self, world):
"""
A single task tick. Calls the task function, supplying it with whatever parameters it needs.
:param world:
World state, set of resources accessible as properties.
:return:
The return value from the task function. As elsewhere, this is interpreted as follows:
None - don't change anything, keep this task running, call it again
TaskStop - exit from the task processing loop, shutting down the process
Task or String - shut this task down, set the named or provided task as the current task
"""
world.dict.update({'task_state': self.state})
return self.task_function(**{name: world.dict[name] for name in world.dict if name in self.all_args})
[docs]def register_task(name, value):
"""
Explicitly register a task, either from a function or from an instance of Task
:param name:
Name used to reference the task
:param value:
Either a task function, in which case this behaves as if the function were annotated with @task, or a Task
object. You may want to use the latter, more verbose, form if extensive setup or custom state handling is
needed by your task, although in general most of such handling should be done with resources and tasks
themselves should remain largely state free.
"""
if isinstance(value, types.FunctionType):
TASKS[name] = SimpleTask(name=name, task_function=value)
LOG.info('Registered task function "%s", required resources: %s', name, TASKS[name].resources)
elif isinstance(value, Task):
TASKS[name] = value
LOG.info('Registered task class "%s", required resources: %s', name, value.resources)
[docs]class Resource(ABC):
"""
Abstract base class for resources, things which are used by tasks and which may have a lifecycle. When a new task
is started, any resources the task requires are started. On each task tick the value method is called on each
resource and is used to populate the world dict passed to the task function. When the task is shut down each
resource has its shutdown function called.
"""
[docs] def __init__(self, name, dependencies=None):
"""
:param name:
Name used when referencing this resource
:param dependencies:
Optional list of strings containing names of resources which this resource depends upon. This does a number
of things, firstly it determines the order of startup and shutdown (with resources being started after their
dependencies and shut down before them), and secondly it causes any specified dependencies to be provided
as parameters to the value(..) method.
"""
self._dependencies = dependencies
self.name = name
@property
def dependencies(self):
return [] if self._dependencies is None else self._dependencies
[docs] @abstractmethod
def startup(self):
"""
Called before the resource is used by any tasks which require it. It should never be called when already active,
but code defensively here and check.
"""
pass
[docs] @abstractmethod
def shutdown(self):
"""
Called after a task which used this resource has finished. Also called if another resource is registered with
the same name, or if the task loop exits. You must implement this to handle multiple calls sensibly, i.e. check
that you haven't already tidied up, don't assume this will only be called after the resource has been started.
"""
pass
[docs] @abstractmethod
def value(self, **kwargs):
"""
Get the value of the resource. The result of this will be passed to any tasks through the world object, or to
any task functions requesting the resource through a named parameter.
"""
pass
[docs]def get_resource_total_order(resources=None):
"""
Resolve a set of resources into an ordering which respects any dependencies, this will also extend the supplied list
to include any transitive dependencies if required.
:param resources:
An iterable of resource names, or None to use all names
:return:
A list of resource names such that no resource depends on a resource later in the list
"""
all_resources = list([res for res in resources if res in RESOURCES]) if resources is not None else list(
RESOURCES.keys())
for name in all_resources:
res = RESOURCES[name]
if res.dependencies is not None:
for dep_name in res.dependencies:
if dep_name not in all_resources:
LOG.info('Adding transitive dependency %s for %s', dep_name, name)
all_resources.append(dep_name)
unresolved_names = set(all_resources)
resolved_names = []
while unresolved_names:
found_something = False
for name in unresolved_names:
res = RESOURCES[name]
if res.dependencies is None or all(dep_name in resolved_names for dep_name in res.dependencies):
LOG.info('Resolved ordering for %s', name)
resolved_names.append(name)
found_something = True
if not found_something:
message = 'Cyclic dependencies in requested resources {}'.format(resources)
LOG.error(message)
raise ValueError(message)
for name in resolved_names:
unresolved_names.discard(name)
return resolved_names
[docs]class SimpleResource(Resource):
"""
Simple resource constructed with value, and optional setup / teardown functions.
"""
[docs] def __init__(self, name, value_func, startup_func=None, shutdown_func=None):
super(SimpleResource, self).__init__(name=name, dependencies=inspect.signature(value_func).parameters.keys())
self.value_func = value_func
self.startup_func = startup_func
self.shutdown_func = shutdown_func
[docs] def startup(self):
if self.startup_func is not None:
self.startup_func()
[docs] def shutdown(self):
if self.shutdown_func is not None:
self.shutdown_func()
[docs] def value(self, **kwargs):
return self.value_func(**kwargs)
[docs]def register_resource(name, value):
"""
Explicitly register a value as a resource. If the value is a function then wrap it up as the value() method of a
resource class instance. If it is already a resource class instance just register it. If it's a plain static value
then wrap it in a function that always returns that value, then wrap that up in the simple class.
"""
if name in RESOURCES:
# If this resource was already defined we're going to overwrite it, so shut the existing one down first
RESOURCES[name].shutdown()
if isinstance(value, types.FunctionType):
RESOURCES[name] = SimpleResource(name=name, value_func=value)
LOG.info('Registered resource function "%s"', name)
elif isinstance(value, Resource):
RESOURCES[name] = value
LOG.info('Registered resource class "%s"', name)
else:
def resource_function():
return value
RESOURCES[name] = SimpleResource(name=name, value_func=resource_function)
LOG.info('Registered resource value "%s"', name)
[docs]@task(name='exit')
def exit_task(error=None):
"""
Exit the loop. If there was an exception raised causing this task to run then wrap the exception in the TaskStop,
otherwise it'll be empty.
"""
return TaskStop(error)
[docs]class TaskStop:
"""
Wraps a single value, defaulting to None. If a task returns an instance of this class, the task loop will exit and
the return value of the run() function will be the wrapped value. This allows tasks to exit the loop and pass
information to the caller as they do so.
"""
[docs] def __init__(self, return_value=None):
self.return_value = return_value
[docs]def run(root_task, error_task='exit', check_tasks=None, raise_exceptions=False):
"""
Run the task loop!
:param root_task:
The first task to start with
:param error_task:
The task to switch to if an exception occurs, defaults to the ExitTask to exit the loop. If you have hardware
such as motors that you need to ensure is deactivated, the best approach is to have a task that only does
hardware shutdown, handles any errors with that process internally, and then delegates to the exit task to
stop the task look. Defaults to the exit task if not specified.
:param check_tasks:
A sequence of functions which will be called before each tick of the selected task. If any of them return
then the return value is used instead of calling and using the value of the task's tick. This can be done
to handle cases like 'make the home button always jump back to the root task', or 'exit the task loop on
low battery conditions' or similar. Don't put too much logic here, it'll get called every tick. Also good for
cases where you absolutely want to bail if hardware isn't available (joystick out of range is a particular case)
:param raise_exceptions:
Defaults to False, if set to True then any exceptions raised by a task will be handled, then wrapped in a
TaskException and raised from this call. If False then they will be handled, and control passed to the
designated error task.
:returns:
If the loop exits as the result of a task returning a :class:`~approxeng.task.TaskStop` it will return the
value wrapped by that instance, otherwise None.
"""
def get_task(t):
"""
Resolve a task instance
:param t:
Either a name, or a Task object
:return:
The task object, if supplied, or the result of a lookup in the TASKS global otherwise
"""
if isinstance(t, Task):
return t
return TASKS[t]
# Start with the root task as the active one
active_task = get_task(root_task)
# Initialise count to 0
# Loop until we're done
finished = False
return_value = None
try:
while not finished:
try:
response = None
# If we have any pre-task checks to run do them now. If any of those functions return
# non-None values we'll use those in place of the active task. Code these carefully!
# Here's where you'd check for e.g. joystick not connected.
if check_tasks is not None:
for check_task in check_tasks:
check_response = check_task()
if check_response is not None:
response = check_response
# If no check_task functions returned anything, run the actual task tick
if response is None:
response = active_task.do_tick()
# If the tick function returned a value it means we need to switch control
if response is not None:
if isinstance(response, Task) or isinstance(response, str):
# New task, either name or Task object. Shut down and switch to it for the next tick
active_task.do_shutdown()
active_task = get_task(response)
elif isinstance(response, TaskStop):
# TaskStop value returned
active_task.do_shutdown()
finished = True
return_value = response.return_value
except Exception as e:
# Anything throwing an exception ends up here. Log it first, then delegate to a handler task
LOG.exception('Exception raised within task loop')
# Shut the active task down, add the exception to the world as 'error' and launch the error task
active_task.do_shutdown()
if raise_exceptions:
raise TaskException from e
register_resource('error', e)
active_task = get_task(error_task)
except TaskException as te:
# Catch and stash the exception in the return value
return_value = te
finally:
# Finished, shut down all resources and exit
for res in reversed(get_resource_total_order()):
RESOURCES[res].shutdown()
# If we're raising exceptions, and there was an exception, raise it.
if raise_exceptions and isinstance(return_value, Exception):
raise return_value
# Otherwise return the return value and exit.
return return_value