Siblings

Warning

This components has performance issues unless running with force=True. Can cause CloudWatch requests throttling. Requires some refactoring.

SiblingsManager provides Lambda executions an option of a “healthy” shutdown. They may pass the remaining payload to another execution automatically. See example:

import logging
import time
from sosw import Processor as SoswProcessor
from sosw.app import LambdaGlobals, get_lambda_handler
from sosw.components.siblings import SiblingsManager

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class Processor(SoswProcessor):

    DEFAULT_CONFIG = {
        'init_clients':    ['Siblings'],    # Automatically initialize Siblings Manager
        'shutdown_period': 10,  # Some time to shutdown in a healthy manner.
    }

    siblings_client: SiblingsManager = None


    def __call__(self, event):

        cursor = event.get('cursor', 0)

        while self.sufficient_execution_time_left:
            self.process_data(cursor)
            cursor += 1
            if cursor == 20:
                return f"Reached the end of data"

        else:
            # Spawning another sibling to continue the processing
            payload = {'cursor': cursor}

            self.siblings_client.spawn_sibling(global_vars.lambda_context, payload=payload, force=True)
            self.stats['siblings_spawned'] += 1


    def process_data(self, cursor):
        """ Your custom logic respecting current cursor. """
        logger.info(f"Processing data at cursor: {cursor}")
        time.sleep(1)


    @property
    def sufficient_execution_time_left(self) -> bool:
        """ Return whether there is a sufficient execution time for processing ('shutdown period' is in seconds). """
        return global_vars.lambda_context.get_remaining_time_in_millis() > self.config['shutdown_period'] * 1000


global_vars = LambdaGlobals()
lambda_handler = get_lambda_handler(Processor, global_vars)

Here is an example use-case when you can store the remaining payload for example in S3 and call the sibling with a pointer to it.

View Licence Agreement

class sosw.components.siblings.SiblingsManager(custom_config=None, **kwargs)[source]

This set of helpers can be used for Lambdas that want to invoke some siblings of self. Very useful for Lambdas processing queues and running out of time.

The Role of your Lambda must have the following extra permissions to run correctly. Please note that we hardcode the Arn in the policy to avoid circular dependency when parsing YAML. This dependency is absolutely valid, but CloudFormation doesn’t know how to parse it.

Policies:
- PolicyName: "YOUR_FUNCTION_NAME"
PolicyDocument:
  Version: "2012-10-17"
  Statement:
  - Effect: "Allow"
    Action: "cloudwatch:GetMetricStatistics"
    Resource: "*"
  - Effect: "Allow"
    Action: "lambda:InvokeFunction"
    Resource: "arn:aws:lambda:us-west-2:737060422660:function:YOUR_FUNCTION_NAME"
any_events_rules_enabled(lambda_context)[source]

Checks the Status of CloudWatch Events Rules. It is very important to use this checker before launching siblings. Otherwise, you can create an infinite autorespawning loop and waste A LOT of money.

Parameters:lambda_context – Context object from your lambda_handler.
Return type:bool
Raises:ResourceNotFoundException – If Rule with the given name doesn’t exist.
get_approximate_concurrent_executions(minutes_back=5, name=None)[source]

Get approximate concurrent executions from CloudWatch Metrics. The value is very approximate and calculated as count of invocations during minutes_back divided by average duration in same period. Return value is rounded to integer using ceil.

We assume that the Role has permissions to read CloudWatch.

Parameters:
  • minutes_back (int) – Aggregate statistics for this number of minutes.
  • name (str) – Name of the function to check. Default: currently running lambda.
Return type:

int

Returns:

Approximate number of concurrent executions.

spawn_sibling(lambda_context, payload=None, force=False)[source]

Asynchronously invokes a copy of same function to continue working. Should be called if there is still work left to do (ex: messages in the queue).

Can optionally send some payload for example remaining unprocessed rows of something. Should be formatted as dictionary.

Parameters:
  • lambda_context – Context object from your lambda_handler.
  • payload (dict) – The payload to be put to event.
  • force (bool) – If specified True it will ignore the checks of enabled Events Rules.