TaskManager

View Licence Agreement

class sosw.managers.task.TaskManager(custom_config=None, **kwargs)[source]

TaskManager is the core class used by most sosw Lambdas. It handles all the operations with tasks thus the configuration of this Manager is essential during your sosw implementation.

The default version of TaskManager works with DynamoDB tables to store and analyze the state of Tasks. This could be upgraded in future versions to work with other persistent storage or DBs.

The very important concept to understand about Task workflow is greenfield. Read more.

construct_payload_for_task(**kwargs) → str[source]

Combines remaining kwargs to a singular JSON payload.

create_task(labourer: sosw.labourer.Labourer, strict: bool = True, **kwargs)[source]

Schedule a new task.

Parameters:
  • labourer – Labourer object of Lambda to execute the task.
  • strict (bool) – By default (True) prohibits specifying in the task (kwargs) the fields that are supposed to be autogenerated. Only if they match with autogen - then pass. You can override this and pass custom task properties setting strict = False
get_completed_tasks_for_labourer(labourer: sosw.labourer.Labourer) → List[Dict][source]

Return a list of tasks of the Labourer marked as completed. Scavenger is supposed to archive them all so no special filtering is required here.

In order to be able to use the already existing index_greenfield, we sort tasks only in invoked stages (greenfield > now()). This number is supposed to be small, so filtering by an un-indexed field will be fast.

get_count_of_running_tasks_for_labourer(labourer: sosw.labourer.Labourer) → int[source]

Returns a number of tasks we assume to be still running. Theoretically they can be dead with Exception, but not yet expired.

get_db_field_name(key: str) → str[source]

Could be useful if you overwrite field names with your own ones (e.g. for tests).

get_expired_tasks_for_labourer(labourer: sosw.labourer.Labourer) → List[Dict][source]

Return a list of tasks of Labourer previously invoked, and expired without being closed.

get_invoked_tasks_for_labourer(labourer: sosw.labourer.Labourer, completed: Optional[bool] = None) → List[Dict][source]

Return a list of tasks of current Labourer invoked during the current run of the Orchestrator.

If completed is provided: * True - filter completed ones * False - filter NOT completed ones * None (default) - do not care about completed status.

get_labourers() → List[sosw.labourer.Labourer][source]

Return configured Labourers. Config of the TaskManager expects ‘labourers’ as a dict ‘name_of_lambda’: {‘some_setting’: ‘value1’}

get_length_of_queue_for_labourer(labourer: sosw.labourer.Labourer) → int[source]

Approximate count of tasks still in queue for labourer. Tasks with greenfield <= now()

Parameters:labourer
Returns:
get_newest_greenfield_for_labourer(labourer: sosw.labourer.Labourer) → int[source]

Return value of the newest greenfield in queue. This means the end of the queue or latest added.

get_next_for_labourer(labourer: sosw.labourer.Labourer, cnt: int = 1, only_ids: bool = False) → List[Union[str, Dict]][source]

Fetch the next task(s) from the queue for the Labourer.

Parameters:
  • labourer – Labourer to get next tasks for.
  • cnt – Optional number of Tasks to fetch.
  • only_ids – If explicitly set True, then returns only the IDs of tasks. This could save some transport if you are sending big batches of tasks between Lambdas.
get_oldest_greenfield_for_labourer(labourer: sosw.labourer.Labourer, reverse: bool = False) → int[source]

Return value of oldest greenfield in queue. This means the beginning of the queue if you need FIFO behaviour.

get_running_tasks_for_labourer(labourer: sosw.labourer.Labourer, count: bool = False) → Union[List[Dict], int][source]

Return a list of tasks of Labourer previously invoked, but not yet closed or expired. We assume they are still running.

If count is specified as True will return just the number of tasks, not the items themselves. Much cheaper.

get_task_by_id(task_id: str) → Dict[source]

Fetches the full data of the Task.

invoke_task(labourer: sosw.labourer.Labourer, task_id: Optional[str] = None, task: Optional[Dict] = None)[source]

Invoke the Lambda Function execution for task. Providing the ID is more expensive, but safer from “task injection” attacks method that prefetches Task from the table before trying to invoke.

Skips already running tasks with no exception, thus concurrent Orchestrators (or whoever else) should not duplicate invocations.

is_valid_task(task: Dict) → bool[source]

Simple validation for required fields.

mark_task_invoked(labourer: sosw.labourer.Labourer, task: Dict, check_running: Optional[bool] = True)[source]

Update the greenfield with the latest invocation timestamp + invocation_delta

By default updates with a conditional expression that fails in case the current greenfield is already in invoked state. If this check fails the function raises RuntimeError that should be handled by the Orchestrator. This is very important to help duplicate invocations of the Worker by simultaneously running Orchestrators.

Parameters:
  • labourer – Labourer for the task
  • task – Task dictionary
  • check_running – If True (default) updates with conditional expression.

:raises RuntimeError

move_task_to_retry_table(task: Dict, wanted_delay: int)[source]

Put the task to a Dynamo table sosw_retry_tasks, with the wanted delay: labourer.max_runtime * attempts. Delete it from sosw_tasks table.

register_labourers() → List[sosw.labourer.Labourer][source]

Sets timestamps, health status and other custom attributes on Labourer objects passed for registration.

We also send a pointer to the TaskManager (aka self) to Ecology Manager. The latter will have to make some queries, and we don’t want him to initialise another TaskManager for himself.

retry_tasks(labourer: sosw.labourer.Labourer, tasks: List[Dict])[source]

Move tasks to tasks table, in beginning of the queue (with greenfield of a task that will be invoked next) All tasks must belong to the same labourer.