"""
.. hidden-code-block:: text
:label: View Licence Agreement <br>
sosw - Serverless Orchestrator of Serverless Workers
The MIT License (MIT)
Copyright (C) 2024 sosw core contributors <info@sosw.app>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Static helper methods which you can use in any Lambdas.
Must be completely independent with no specific requirements.
"""
__all__ = ['validate_account_to_dashed',
'validate_account_to_int',
'validate_list_of_numbers_from_csv',
'camel_case_to_underscore',
'chunks',
'validate_uuid4',
'rstrip_all',
'get_one_or_none_from_dict',
'get_one_from_dict',
'get_list_of_multiple_or_one_or_empty_from_dict',
'validate_date_list_from_event_or_days_back',
'validate_date_from_something',
'validate_datetime_from_something',
'validate_string_matches_datetime_format',
'is_valid_date',
'recursive_matches_soft',
'recursive_matches_strict',
'recursive_matches_extract',
'dunder_to_dict',
'nested_dict_from_keys',
'convert_string_to_words',
'construct_dates_from_event',
'validate_list_of_words_from_csv_or_list',
'first_or_none',
'recursive_update',
'trim_arn_to_name',
'trim_arn_to_account',
'make_hash',
'to_bool',
'get_message_dict_from_sns_event',
'is_event_from_sns',
'unwrap_event_recursively',
'is_event_from_sqs',
'small_int_from_string',
]
import datetime
import hashlib
import json
import re
import uuid
from collections import abc, defaultdict
from copy import deepcopy
from datetime import timezone
from typing import Iterable, Callable, Dict, Mapping, List, Optional
from sosw.components.exceptions import EventNotFromSourceException
[docs]
def validate_account_to_dashed(account):
"""
Validates the the provided string is in valid AdWords account format and converts it to dashed format.
:param str account: AdWords Account
:rtype: str
:return: Dashed format
"""
account = str(account).strip()
if re.match("[0-9]{3}-[0-9]{3}-[0-9]{4}", account):
return account
if re.match("^[0-9]{10}$", account):
return '-'.join([str(account)[0:3], str(account)[3:6], str(account)[6:]])
raise ValueError("Invalid account format provided: {}".format(account))
[docs]
def validate_account_to_int(account):
"""
Validates the provided string is in valid AdWords account format and converts it to integer format.
:param (str, int) account: AdWords Account
:return: Account ID as integer
"""
account = str(account).strip().replace('-', '')
if re.match("^[0-9]{10}$", account):
return int(account)
raise ValueError("Invalid account format provided: {}".format(account))
[docs]
def validate_list_of_numbers_from_csv(data):
"""
Converts a comma separated string of numeric values to a list of sorted unique integers.
The values that do not match are skipped.
:param (str, iterable) data: - str | iterable
:return: - list(int)
"""
if isinstance(data, str):
return [int(x.strip()) for x in data.split(',') if x.strip().isnumeric()]
if isinstance(data, (int, float)):
return [data]
result = []
try:
for x in data:
if isinstance(x, (int, float)):
result.append(int(x))
elif isinstance(x, str) and x.strip().isnumeric():
result.append(int(x.strip()))
except TypeError:
pass
return result
[docs]
def validate_uuid4(uuid_string):
"""
Validate that a UUID string is in
fact a valid uuid4.
Happily, the uuid module does the actual
checking for us.
It is vital that the 'version' kwarg be passed
to the UUID() call, otherwise any 32-character
hex string is considered valid.
"""
try:
_ = uuid.UUID(uuid_string, version=4)
except ValueError:
# If it's a value error, then the string
# is not a valid hex code for a UUID.
return False
# If the uuid_string is a valid hex code,
# but an invalid uuid4,
# the UUID.__init__ will convert it to a
# valid uuid4. This is bad for validation purposes.
[docs]
def camel_case_to_underscore(name):
"""
Converts attribute to string and formats it as underscored.
:param name: - str - CamelCase string (or something convertable to CamelCase with __str__() method.
:return: - str - underscore_formatted_value
"""
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', str(name))
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
[docs]
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
[docs]
def rstrip_all(input, patterns):
"""
Strips all of the patterns from the right of the input. Order and spaces do not matter.
:param input: - str - String to modify
:param patterns: - list|set|tuple|str - Pattern[-s] to remove.
:return: - str
"""
if isinstance(patterns, str):
regex = re.compile("({}$)".format(re.escape(patterns)))
else:
regex = re.compile("({}$)".format("$|".join(map(re.escape, patterns))))
if isinstance(patterns, str):
patterns = [patterns]
if not isinstance(patterns, (list, set, tuple)) or not all(isinstance(x, str) for x in patterns):
raise ValueError("Patterns for rstrip_all() are supposed to be string or iterable of strings")
rabbit = input.strip()
r = regex.sub('', rabbit)
if not r == rabbit:
# Go recursive in case we stripped smth in this iteration.
return rstrip_all(r, patterns)
# If nothing left to change, return the rabbit.
return rabbit
[docs]
def get_one_or_none_from_dict(input, name, vtype=None):
"""
Extracts object by 'name' from the 'input'.
Tries also plural name in case not found by single 'name'.
In case found an iterable by plural name, validates that it has one or zero values in it.
If vtype is specified, tries to convert result to it.
:param dict input: Input dictionary. Event of Lambda for example.
:param str name: Name of attribute (in singular form).
:param type vtype: Type to be converted to. Must be callable. Tested types: str, int, float
:return: - instance of vtype | something else | None
:raises ValueError: In all cases something is wrong.
"""
if not isinstance(input, dict):
raise ValueError("'input' attribute must be a dict. Received: {}".format(type(input)))
if not isinstance(name, str):
raise ValueError("'name' attribute must be a str. Received: {}".format(type(name)))
def convert(obj, t):
return obj if not t else t(obj)
# Best case scenario. :)
result = input.get(name)
if result:
return convert(result, vtype)
# if not result, try to search for plural
results = input.get(name + 's')
if not results:
return None
# If we found some results with plural name, we make sure it is iterable and has one or zero value.
if isinstance(results, (list, tuple, set)):
if len(results) > 1:
raise ValueError("More than one {}s found in input.".format(name))
return convert(results[0], vtype)
elif results:
raise ValueError("Some not-iterable '{}s' found in input: {}".format(name, str(type(result))))
[docs]
def get_one_from_dict(input, name, vtype=None):
"""
Extracts object by 'name' from the 'input'.
Tries also plural name in case not found by single 'name'.
In case found an iterable by plural name, validates that it has exactly one value in it.
If vtype is specified, tries to convert result to it.
:param input: - dict - Input dictionary. Event of Lambda for example.
:param name: - str - Name of attribute (in singular form).
:param vtype: - type - Type to be converted to. Must be callable. Tested types: str, int, float
:return: - instance of vtype | something else | None
:raises ValueError: - In all cases something is wrong.
"""
result = get_one_or_none_from_dict(input, name, vtype)
if result:
return result
raise ValueError("Did not find any value {} in the input {}".format(name, input))
[docs]
def get_list_of_multiple_or_one_or_empty_from_dict(input, name, vtype=None):
"""
Extracts objects by 'name' from the 'input' and returns as a list.
Tries both plural and singular names from the input.
If vtype is specified, tries to convert each of the elements in the result to this type.
:param input: - dict - Input dictionary. Event of Lambda for example.
:param name: - str - Name of attribute (in plural form).
:param vtype: - type - Type to be converted to. Must be callable. Tested types: str, int, float
:return: - list - List of vtypes, or list of whatever was in input, or empty list.
:raises ValueError: In all cases something is wrong.
"""
if not isinstance(input, dict):
raise ValueError("'input' attribute must be a dict. Received: {}".format(type(input)))
if not isinstance(name, str):
raise ValueError("'name' attribute must be a str. Received: {}".format(type(name)))
def convert(obj, t):
return obj if not t else t(obj)
results = input.get(name) or input.get(name.rstrip('s'))
if not results:
return []
# Wrap to list if not there yet.
if not isinstance(results, (list, tuple, set)):
results = [results]
else:
results = list(results)
# Apply vtype convertion if required.
return [convert(x, vtype) for x in results]
[docs]
def validate_date_list_from_event_or_days_back(input, days_back=0, key_name='date_list'):
"""
Takes from input the date_list and extracts date_list. Validates and converts to datetime.date.
Input should have date_list as list of strings or comma-separated string.
* Format: ``YYYY-MM-DD``
* Examples:
.. code-block:: python
['2018-01-01', '2018-02-01']
'2018-01-01, 2018-02-01'
:param dict input: This is supposed to be your whole Lambda event.
:param int days_back: Optional Number of days to take back from today.
Ex: days_back=1 is yesterday. Default: today.
:param str key_name: Optional custom name of key to extract from 'input'.
:return: list(datetime.date)
"""
date_list = input.get(key_name, '')
if not date_list:
return [datetime.date.today() - datetime.timedelta(days=days_back)]
if not isinstance(date_list, (list, set, tuple)):
date_list = str(date_list).split(',')
return [datetime.datetime.strptime(x.strip(), '%Y-%m-%d').date() for x in date_list]
[docs]
def validate_datetime_from_something(d):
"""
Converts the input `d` to datetime.datetime.
:param d: Some input. Supported types:
* datetime.datetime
* datetime.date
* int - Epoch or Epoch milliseconds
* float - Epoch or Epoch milliseconds
* str (YYYY-MM-DD)
* str (YYYY-MM-DD HH:MM:SS)
* str(epoch time seconds as string)
* str(epoch time seconds (float) as string)
:return: Transformed `d`
:rtype: datetime.datetime
:raises: ValueError
"""
mutators = [
(datetime.datetime, lambda x: x),
(datetime.date, lambda x: datetime.datetime.combine(x, datetime.datetime.min.time())),
((int, float), lambda x: datetime.datetime.fromtimestamp(x)
if x < datetime.datetime(datetime.MAXYEAR, 12, 31).timestamp()
else datetime.datetime.fromtimestamp(x / 1000)),
(str, lambda x: datetime.datetime.fromtimestamp(float(d)) if x.replace('.', '').isnumeric() else
(datetime.datetime.strptime(d, '%Y-%m-%d')
if len(d) == 10 else datetime.datetime.strptime(d[:19], '%Y-%m-%d %H:%M:%S'))),
]
for mutator in mutators:
if isinstance(d, mutator[0]):
return mutator[1](d)
raise ValueError("Some unconvertable type for datetime validation: {}".format(d))
[docs]
def validate_date_from_something(d):
"""
Convert valid input to datetime.date() or raise either AttributeError or ValueError.
:param d: Some input. Supported types:
* datetime.datetime
* datetime.date
* int - Epoch or Epoch milliseconds
* float - Epoch or Epoch milliseconds
* str (YYYY-MM-DD)
* str (YYYY-MM-DD HH:MM:SS)
:return: Transformed `d`
:rtype: datetime.date
:raises: ValueError
"""
return validate_datetime_from_something(d).date()
[docs]
def is_valid_date(date_str, date_formats):
"""
Validate string to be at least one of the given datetime formats.
:param str date_str: a date or time or both, Example: '2018/09/16'
:param list date_formats: List of datetime format, that is acceptable for datetime.strptime. Example: '%Y/%m/%d'
:rtype: bool
:return: True if the date string is valid for any of the datetime formats, False otherwise.
"""
for date_format in date_formats:
try:
validate_string_matches_datetime_format(date_str, date_format)
return True
except ValueError as err:
continue
return False
[docs]
def recursive_matches_soft(src, key, val, **kwargs):
"""
Searches the 'src' recursively for nested elements provided in 'key' with dot notation.
In case some levels are iterable (list, tuple) it checks every element.
In case the full path is inaccessible returns False.
If any of the elements addressed by 'key' matches the 'val' - Bingo! Return True.
You might also be interested in recursive_exists_strict() helper.
:param dict src: Input dictionary. Can contain nested dictionaries and lists.
:param str key: Path to search with dot notation.
:param any val: Value to match in some elements specified by path.
In order to check not just that some element exists, but to check for duplicates, you might want to use
optional 'exclude' attributes. If attributes are specified and the last level element following the path
(dot notation) will have a key-value, the check for the main key-value will be skipped.
See unittests to understand the bahaviour better.
:param str exclude_key: Key to check in last level element to exclude.
:param srt exclude_val: Value to match in last level element to exclude.
:rtype: bool
"""
if any([x in kwargs for x in ['exclude_key', 'exclude_val']]) \
and not all([x in kwargs for x in ['exclude_key', 'exclude_val']]):
raise AttributeError("If you use 'exclude' attributes you must specify both 'exclude_key' and 'exclude_val'")
path_elements = key.split('.')
# logging.debug("Invoked func: ", src, key, path_elements)
# if src is iterable: iterate recursively
if isinstance(src, (list, tuple)):
return any(recursive_matches_soft(element, key, val, **kwargs) for element in src)
# We should try to dig deeper.
if len(path_elements) > 1:
try:
if recursive_matches_soft(src[path_elements[0]], '.'.join(path_elements[1:]), val, **kwargs):
return True
except KeyError:
pass
# Last level of digging
elif len(path_elements) == 1:
try:
if kwargs.get('exclude_key') and src[kwargs['exclude_key']] == kwargs['exclude_val']:
# logging.debug("Skipping element because it matches exclude parameters.")
return False
except KeyError:
pass # There is a chance that the exclude key is simply missing. We ignore it then.
try:
return src[key] == val
except (KeyError, TypeError):
pass
else:
raise RuntimeError("Your function is stupid")
# If nothing found we return False
return False
[docs]
def recursive_matches_strict(src, key, val, **kwargs):
"""
Searches the 'input' recursively for nested elements provided in 'key' with dot notation.
In case some levels are iterable (list, tuple) it checks every element.
In case the full path is inaccessible raises AttributeError or KeyError.
:param dict src: Input dictionary. Can contain nested dictionaries and lists.
:param str key: Path to search with dot notation.
:param any val: Value to match in some elements specified by path.
:rtype: bool
"""
if any([x in kwargs for x in ['exclude_key', 'exclude_val']]) \
and not all([x in kwargs for x in ['exclude_key', 'exclude_val']]):
raise AttributeError("If you use 'exclude' attributes you must specify both 'exclude_key' and 'exclude_val'")
path_elements = key.split('.')
# if src is iterable: iterate
if isinstance(src, (list, tuple)):
return any(recursive_matches_strict(x, key, val, **kwargs) for x in src)
if len(path_elements) > 1:
return recursive_matches_strict(src[path_elements[0]], '.'.join(path_elements[1:]), val, **kwargs)
if len(path_elements) == 1:
try:
if kwargs.get('exclude_key') and src[kwargs['exclude_key']] == kwargs['exclude_val']:
# logging.debug("Skipping element because it matches exclude parameters.")
return False
except KeyError:
pass # There is a chance that the exclude key is simply missing. We ignore it then.
return src[key] == val
else:
raise RuntimeError("Your function is stupid", src, key, val)
[docs]
def dunder_to_dict(data: dict, separator=None):
"""
Converts the flat dict with keys using dunder notation for nesting elements to regular nested dictionary.
E.g.:
.. code-block:: python
data = {'a': 'v1', 'b__c': 'v2', 'b__d__e': 'v3'}
result = dunder_to_dict(data)
# result:
{
'a': 'v1',
'b': {
'c': 'v2',
'd': {'e': 'v3'}
}
}
:param data: A dictionary that is converted to Nested.
:param str separator: Custom separator for recursive extraction. Default: `'.'`
"""
if not separator:
separator = '__'
else:
if not isinstance(separator, str):
raise TypeError("Separator must be a string.")
result = defaultdict(dict)
for k, v in data.items():
if separator not in k: # Just set the value if key is not separated
result[k] = v
else:
if k.endswith(separator) or k.startswith(separator):
raise ValueError(f"Your keys should not have {separator} on sides of keys. Only as separators: {k}")
k_split = k.split(separator)
main_key, nested_keys = k_split[0], k_split[1:]
# Make sure that value is recursively parsed for separators as well.
if isinstance(v, dict):
v = dunder_to_dict(data=v, separator=separator)
# Construct a nested dictionary embedding value to the deepest level.
new_subdict = nested_dict_from_keys(nested_keys, value=v)
# Just merge the new nested dictionary in the final result.
result[main_key] = recursive_update(result[main_key], new_subdict)
return dict(result)
[docs]
def nested_dict_from_keys(keys: List, value: Optional = None) -> Dict:
"""
Constructs a nested dictionary using a list of keys to embed recursively.
If `value` is provided it is assigned to the last subkey.
Examples:
.. code-block:: python
nested_dict_from_keys(['a', 'b', 'c']) == {'a': {'b': {'c': None}}}
nested_dict_from_keys(['a', 'b', 'c'], value=42) == {'a': {'b': {'c': 42}}}
:param keys: List of keys to embed.
:param value: Optional value to set to lowest level
"""
if len(keys) == 0:
return value
assert isinstance(keys[0], abc.Hashable), f"Keys of dictionary must be hashable for nestify. Got: {type(keys[0])}"
return {keys[0]: nested_dict_from_keys(keys[1:], value)}
[docs]
def convert_string_to_words(string):
"""
Convert string to comma separated words.
:param str string: String to convert into words.
:rtype: str
:return: Comma separated words.
"""
if not isinstance(string, str):
raise TypeError(f"Input must be string, got {type(string)}")
return re.sub(r'\s+', ',', string.lower().strip())
[docs]
def construct_dates_from_event(event: dict) -> tuple:
"""
Processes given event dictionary for start and end points of time. Otherwise takes the default settings.
The end date of the period may be specified as `en_date` in the event. The default value is today.
Also the `event` should have either `st_date` or `days_back` numeric parameter.
If provided the days_back it will be substracted from end date.
Both `st_date` and `en_date` might be either `date`, `datetime` or `string` (`'YYYY-MM-DD'`) types.
In case of `datetime`, the hours/minutes/etc are ignored.
:param dict event: Lambda payload.
:return: start_date, end_date as datetime.date
"""
en_date = validate_date_from_something(event.get('en_date', datetime.date.today()))
st_date = event.get('st_date')
days_back = event.get('days_back')
if st_date and days_back:
raise AttributeError("construct_dates_from_event() doesn't allow st_date and days_back simultaneously")
if not st_date and not days_back:
raise AttributeError("construct_dates_from_event() expects either st_date or days_back")
if days_back:
st_date = en_date - datetime.timedelta(days=int(days_back))
else:
st_date = validate_date_from_something(st_date)
assert st_date < en_date, "Start date must be earlier than end date."
return st_date, en_date
[docs]
def validate_list_of_words_from_csv_or_list(data: (str, list)) -> list:
"""
Splits a CSV string to list of stripped words.
In case the `data` is already a list of strings - splits it's elements and flattens the result.
All resulting elements must be single words, if any of the elements contains spaces (i.e. multiple words)
the validation fails with `ValueError`.
:param data: CSV string of list of strings (possibly CSV themselves)
:return: List of stripped and split words
"""
def split_csv(row):
if not isinstance(row, str):
raise TypeError(f"Unsupported type of data for validate_list_of_words_from_csv_or_list(): {data}")
return [x.strip() for x in row.split(',')]
result = []
if isinstance(data, (list, tuple, set)):
for element in data:
result.extend(split_csv(element))
else:
result = split_csv(data)
if any(' ' in x for x in result):
raise ValueError(f"data for validate_list_of_words_from_csv_or_list() should be csv of WORDS or list: {data}")
return result
[docs]
def first_or_none(items: Iterable, condition: Callable = None):
"""
Return first element in iterable to match condition or None
"""
if not condition:
def condition(*args, **kwargs):
return True
for item in items:
if condition(item):
return item
return None
[docs]
def recursive_update(d: Dict, u: Mapping) -> Dict:
"""
Recursively updates the dictionary `d` with another one `u`.
Values of `u` overwrite in case of type conflict.
Examples (in comparison with dict.update([other])):
.. code-block:: python
d = {'a': 42, 'b': {'b1': 33, 'b2': 44}}
u = {'a': 43, 'b': {'b1': 22, 'b3': 33}}
recursive_update(d, u)
# result:
{'a': 43, 'b': {'b1': 22, 'b2': 44, 'b3': 33}}
d.update(u)
# result:
{'a': 43, 'b': {'b1': 22, 'b3': 33}}
List, set and tuple values of `d` and `u` are merged, preserving only unique values. Returned as List.
"""
new = deepcopy(d)
for k, v in u.items():
if isinstance(v, abc.Mapping) and isinstance(d.get(k), (abc.Mapping, type(None))):
new[k] = recursive_update(d.get(k, {}), v)
elif isinstance(v, (set, list, tuple)):
if isinstance(d.get(k), (set, list, tuple)):
# Merge lists of uniques. I really want this helper to eat anything and return what it should. :)
nv = list(d[k]) + list(v)
try:
new[k] = list(set(nv))
# The types of values in list could be unhashable, so it is not that easy filter uniques.
# In case the elements are dictionaries try JSONification and unuque by strings.
except TypeError:
new[k] = None
# If types are not hashable we still try to deal with them as if Dictionaries.
# In this case we filter unique ones by JSON values and them unfold them back and reconstruct Dicts.
if not new[k]:
try:
jsons = set(json.dumps(sorted(x.items())) for x in nv)
new[k] = [dict(json.loads(x)) for x in jsons]
except (TypeError, AttributeError):
# If not all values of iterable are hashable and not Dictionaries we just merge them as is.
new[k] = nv
else:
new[k] = v
else:
new[k] = v
return new
[docs]
def trim_arn_to_name(arn: str) -> str:
"""
Extract just the name of function from full ARN. Supports versions, aliases or raw name (without ARN).
More information about ARN Format:
https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns
"""
# Special handling for super global services (e.g. S3 buckets)
if arn.count(':') < 6 and '/' not in arn:
return arn.split(':')[-1]
# Seems a little messy, but passes more/less any test of different ARNs we tried.
pattern = "(arn:aws:[0-9a-zA-Z-]{2,20}:[0-9a-zA-Z-]{0,12}:[0-9]{12}:[0-9a-zA-Z-]{2,20}[:/])?" \
"(?P<name>[0-9a-zA-Z_=,.@-]*)(:)?([0-9a-zA-Z$]*)?"
return re.search(pattern, arn).group('name')
[docs]
def trim_arn_to_account(arn: str) -> str:
"""
Extract just the ACCOUNT_ID from full ARN. Supports versions, aliases or raw name (without ARN).
More information about ARN Format:
https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns
"""
# Seems a little messy, but passes more/less any test of different ARNs we tried.
pattern = "(arn:aws:[0-9a-zA-Z-]{2,20}:[0-9a-zA-Z-]{0,12}:)?(?P<acc>[0-9]{12})(:[0-9a-zA-Z-]{2,20}[:/])?" \
"(?P<name>[0-9a-zA-Z_=,.@-]*)(:)?([0-9a-zA-Z$]*)?"
return re.search(pattern, arn).group('acc')
[docs]
def make_hash(o):
"""
Makes a hash from a dictionary, list, tuple or set to any level, that contains
only other hashable types (including any lists, tuples, sets, and
dictionaries).
Original idea from this user:
https://stackoverflow.com/users/660554/jomido
Plus some upgrades to work with sets and dicts having different types of keys appropriately.
See source unittests of this function for some more details.
"""
if isinstance(o, (tuple, list)):
return tuple([make_hash(e) for e in o])
# Set should be sorted (by hashes of elements) before returns
if isinstance(o, set):
return tuple(sorted([make_hash(e) for e in o]))
if not isinstance(o, dict):
return hash(o)
# We are left with a dictionary
new_o = dict()
for k, v in o.items():
# hash both keys and values to make sure types and order doesn't affect.
new_o[make_hash(k)] = make_hash(v)
return hash(tuple(frozenset(sorted(new_o.items()))))
def to_bool(val):
if isinstance(val, (bool, int, float)):
return bool(val)
if isinstance(val, str):
if val.lower() in ['true', '1']:
return True
if val.lower() in ['false', '0']:
return False
raise Exception(f"Can't convert unexpected value to bool: {val}, type: {type(val)}")
def _unwrap_msg_dict_from_sns_event(event) -> Dict:
if 'Records' in event and len(event['Records']) > 1:
raise ValueError(f"SNS event is not expected to have more than one record. Event: {event}")
try:
return json.loads(event['Records'][0]['Sns']['Message'])
except:
return json.loads(event['Message'])
[docs]
def get_message_dict_from_sns_event(event: Dict) -> Dict:
"""
Extract SNS event message and return it loaded as a dict.
:param event: Lambda SNS event (payload). Must be a JSON document.
:return: The SNS message, converted to dict
"""
if is_event_from_sns(event):
return _unwrap_msg_dict_from_sns_event(event)
raise ValueError("Event is not from SNS")
[docs]
def is_event_from_sns(event):
"""
Check if the lambda invocation was by SNS.
:param dict event: Lambda Event (payload)
:rtype: bool
"""
try:
return bool(event['Records'][0]['Sns']['Message'])
except:
pass
try:
return bool('Message' in event and 'TopicArn' in event and ':sns:' in event['TopicArn'])
except:
pass
return False
def _unwrap_message_dicts_from_sqs_event(event) -> List[Dict]:
sqs_messages = []
for record in event['Records']:
d = json.loads(record['body'])
sqs_messages.append(d)
return sqs_messages
def is_event_from_sqs(event) -> bool:
try:
return event['Records'][0]['eventSource'] == 'aws:sqs'
except:
pass
return False
unwrap_checker_methods = {
# Methods with input: event, output: bool
'sns': is_event_from_sns,
'sqs': is_event_from_sqs,
}
unwrap_extractor_methods = {
# Methods with input: event, output: Dict / List[Dict]
'sns': _unwrap_msg_dict_from_sns_event,
'sqs': _unwrap_message_dicts_from_sqs_event,
}
def _unwrap_event_messages(event: Dict, source: str) -> List[Dict]:
"""
*source* can be sns / sqs
Unwrap list of messages from *source*, if the event is from this source.
Will unwrap a single layer (unlike ``unwrap_event_recursively`` - which is recursive).
If the event is not from this source, will raiseEventNotFromSourceException
.. code-block:: python
unwrapped_sns_messages = _unwrap_event_messages(event, source='sns'):
:param event:
:param source: 'sns' or 'sqs'
:return: List of events/messages unwrapped from the *source*
:raises: EventNotFromSourceException
"""
checker_method = unwrap_checker_methods[source]
extractor_method = unwrap_extractor_methods[source]
if checker_method(event): # is_event_from_sns/sqs
unwrapped = extractor_method(event) # _unwrap_msg_dicts_from_sns/sqs_event
return unwrapped if isinstance(unwrapped, list) else [unwrapped]
raise EventNotFromSourceException(f"Event is not from {source}, can't unwrap it")
[docs]
def unwrap_event_recursively(event: Dict, sources: Optional[List[str]] = None) -> List[Dict]:
"""
Recursively unwraps lambda event from SQS and/or SNS event skeletons.
Supported sources: 'sqs', 'sns'.
Will unwrap recursively until the event is not wrapped anymore, or up to depth of 10
.. code-block:: python
unwrapped_messages = unwrap_event_recursively(event, sources=['sns', 'sqs']):
:param event: Lambda event
:param sources: List of strings describing what the event might be wrapped by. If empty, will unwrapped from all.
:return: List of dictionaries - unwrapped messages from the event
"""
messages = [event]
sources = [x.lower() for x in sources or ['sns', 'sqs']]
max_depth = 10 # Unwrapping up to depth of 10, as a safety mechanism against infinite loop
for _ in range(max_depth):
original = deepcopy(messages)
for source in sources:
converted_messages = []
for msg in messages:
try:
unwrapped = _unwrap_event_messages(msg, source=source)
except EventNotFromSourceException:
unwrapped = [msg]
converted_messages.extend(unwrapped)
messages = converted_messages
if original == messages:
break
return messages
[docs]
def small_int_from_string(input_string: str, num_digits: int = 2) -> int:
"""
Generate a small integer based on the input string using its MD5 hash.
This value is reproducible, so it could be useful for example if you use it
for some kind of partitioning or unsorted batching in order to be able to
query based on it later on.
Examples:
.. code-block:: python
small_int_from_string("hello world")
91
small_int_from_string("hello world", num_digits=3)
291
:return: The generated small integer.
:raises: ValueError: If num_digits is not a positive integer.
"""
if not isinstance(num_digits, int) or num_digits <= 0:
raise ValueError("Number of digits must be a positive integer.")
hash_object = hashlib.md5()
hash_object.update(input_string.encode())
hex_digest = hash_object.hexdigest()
int_value = int(hex_digest, 16)
return int_value % (10 ** num_digits)