Worker

View Licence Agreement

class sosw.worker.Worker(*args, **kwargs)[source]

We recommend that you inherit your core Processor from this class in Lambdas that are orchestrated by sosw.

The __call__ method is supposed to accept the event of the Lambda invocation. This is a dictionary with the payload received in the lambda_handler during invocation.

Worker has all the common methods of Processor and tries to mark task as completed if received task_id in the event. Worker create a payload with stats and result if exist and invoke worker assistant lambda.

Worker class can optionally record 'completed' and 'failed' events to the DynamoDB tasks meta data table. In order to enable this feature, you have to provide 'meta_handler_config' in your custom_config. You also need to grant write permissions for this table to your Lambda.

You can find more information about the configuration in the MetaHandler chapter.

mark_task_as_completed(task_id: str)[source]

Call worker assistant lambda and tell it to close task

mark_task_as_failed(task_id: str)[source]

Call worker assistant lambda and tell it to update task info

Example

Please find the following elementary example of Worker Lambda.

import logging
from sosw import Worker
from sosw.app import LambdaGlobals, get_lambda_handler

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class Processor(Worker):

    DEFAULT_CONFIG = {
        'init_clients':     ['dynamo_db'],
        'dynamo_db_config': {
            'row_mapper':      {
                'hash_col':  'S',  # Number
                'range_col': 'N',  # String
            },
            'required_fields': ['hash_col', 'range_col'],
            'table_name':      'autotest_dynamo_db',  # If a table is not specified, this table will be used.
        }
    }

    dynamo_db_client = None


    def __call__(self, event):

        # Example of your Worker logic
        row = event.get('row')
        self.put_to_db(row)

        # Do some basic cleaning and marking `sosw` task as completed.
        super().__call__(event)


    def put_to_db(self, row):

        self.dynamo_db_client.put(row)


    # Setting the entry point of the lambda.
    global_vars = LambdaGlobals()
    lambda_handler = get_lambda_handler(Processor, global_vars)

In case you inherit from the Worker you do not have to implement anything custom for the function to be properly orchestrated. Just do not forget to call the super().__call__(event) at the end of your execution. It will automatically collect and update the stats as well as call the WorkerAssistant Lambda function to close the task. If you want to use the self.result after the function call, call the super().__call__(event, reset_result=False). The default behaviour is to reset self.result after each call.