Worker
View Licence Agreement- class sosw.worker.Worker(*args, **kwargs)[source]
We recommend that you inherit your core Processor from this class in Lambdas that are orchestrated by sosw.
The
__call__
method is supposed to accept theevent
of the Lambda invocation. This is a dictionary with the payload received in the lambda_handler during invocation.Worker has all the common methods of Processor and tries to mark task as completed if received
task_id
in theevent
. Worker create a payload withstats
andresult
if exist and invoke worker assistant lambda.Worker class can optionally record
'completed'
and'failed'
events to the DynamoDB tasks meta data table. In order to enable this feature, you have to provide'meta_handler_config'
in your custom_config. You also need to grant write permissions for this table to your Lambda.You can find more information about the configuration in the MetaHandler chapter.
Example
Please find the following elementary example of Worker Lambda.
import logging
from sosw import Worker
from sosw.app import LambdaGlobals, get_lambda_handler
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class Processor(Worker):
DEFAULT_CONFIG = {
'init_clients': ['dynamo_db'],
'dynamo_db_config': {
'row_mapper': {
'hash_col': 'S', # Number
'range_col': 'N', # String
},
'required_fields': ['hash_col', 'range_col'],
'table_name': 'autotest_dynamo_db', # If a table is not specified, this table will be used.
}
}
dynamo_db_client = None
def __call__(self, event):
# Example of your Worker logic
row = event.get('row')
self.put_to_db(row)
# Do some basic cleaning and marking `sosw` task as completed.
super().__call__(event)
def put_to_db(self, row):
self.dynamo_db_client.put(row)
# Setting the entry point of the lambda.
global_vars = LambdaGlobals()
lambda_handler = get_lambda_handler(Processor, global_vars)
In case you inherit from the Worker
you do not have to implement anything custom
for the function to be properly orchestrated. Just do not forget to call the super().__call__(event)
at the end of your execution. It will automatically collect and update the stats
as well as call
the WorkerAssistant
Lambda function to close the task. If you want to use the self.result after the function call,
call the super().__call__(event, reset_result=False)
. The default behaviour is to reset self.result
after each call.