Worker

View Licence Agreement

class sosw.worker.Worker(custom_config=None, **kwargs)[source]

We recommend that you inherit your core Processor from this class in Lambdas that are orchestrated by sosw.

The __call__ method is supposed to accept the event of the Lambda invocation. This is a dictionary with the payload received in the lambda_handler during invocation.

Worker has all the common methods of Processor and tries to mark task as completed if received task_id in the event. Worker create a payload with stats and result if exist and invoke worker assistant lambda.

mark_task_as_completed(task_id: str)[source]

Call worker assistant lambda and tell it to close task

mark_task_as_failed(task_id: str)[source]

Call worker assistant lambda and tell it to update task info

Example

Please find the following elementary example of Worker Lambda.

import logging
from sosw import Worker
from sosw.app import LambdaGlobals, get_lambda_handler

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class Processor(Worker):

    DEFAULT_CONFIG = {
        'init_clients':     ['dynamo_db'],
        'dynamo_db_config': {
            'row_mapper':      {
                'hash_col':  'S',  # Number
                'range_col': 'N',  # String
            },
            'required_fields': ['hash_col', 'range_col'],
            'table_name':      'autotest_dynamo_db',  # If a table is not specified, this table will be used.
        }
    }

    dynamo_db_client = None


    def __call__(self, event):

        # Example of your Worker logic
        row = event.get('row')
        self.put_to_db(row)

        # Do some basic cleaning and marking `sosw` task as completed.
        super().__call__(event)


    def put_to_db(self, row):

        self.dynamo_db_client.put(row)


    # Setting the entry point of the lambda.
    global_vars = LambdaGlobals()
    lambda_handler = get_lambda_handler(Processor, global_vars)

In case you inherit from the Worker you do not have to implement anything custom for the function to be properly orchestrated. Just do not forget to call the super().__call__(event) at the end of your execution. It will automatically collect and update the stats as well as call the WorkerAssistant Lambda function to close the task.