"""
.. hidden-code-block:: text
:label: View Licence Agreement <br>
sosw - Serverless Orchestrator of Serverless Workers
The MIT License (MIT)
Copyright (C) 2024 sosw core contributors <info@sosw.app>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Config manager component. Has methods for getting configuration for lambdas.
Use `Config` class to get the correct methods we use for each action.
Using SSMConfig or DynamoConfig directly is discouraged.
Especially SSMConfig, because SSM throttles and has limits we reached in the past.
Using these methods requires the Role to have permissions to access SSM and/or Dynamo for requested resources.
"""
__all__ = ['ConfigSource', 'get_config', 'update_config', 'get_credentials_by_prefix']
__author__ = "Sophie Fogel, Nikolay Grishchenko"
__version__ = "1.7.1"
try:
from aws_lambda_powertools import Logger
logger = Logger(child=True)
except ImportError:
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
import boto3
import json
import os
from sosw.components.helpers import chunks
from sosw.components.dynamo_db import DynamoDbClient
class SecretsManager:
secretsmanager_client = None
def __init__(self, test=False, **kwargs):
self.test = test
if not self.test:
self.test = True if os.environ.get('STAGE') == 'test' or os.environ.get('autotest') == 'True' else False
def _get_secretsmanager_client(self):
if self.secretsmanager_client is None:
self.secretsmanager_client = boto3.client('secretsmanager')
return self.secretsmanager_client
def call_boto_secrets_with_pagination(self, f, **kwargs):
"""
Invoke SecretsManager functions with the ability to paginate results.
:param str f: SecretsManager function to invoke.
:param object kwargs: Keyword arguments for the function to invoke.
:rtype list
:return: If the function can be the paginate the response will return as paginate iterator.
Else it will return as a list
"""
secretsmanager_client = self._get_secretsmanager_client()
func = getattr(secretsmanager_client, f)
can_paginate = getattr(secretsmanager_client, 'can_paginate')(f)
if can_paginate:
logging.debug("SecretsManager.%s can natively paginate", f)
paginator = secretsmanager_client.get_paginator(f)
response = paginator.paginate(**kwargs)
return response
else:
logging.debug("SecretsManager.%s can not natively paginate", f)
response_list = []
response = func(**kwargs)
response_list.append(response)
while 'NextToken' in response:
kwargs['NextToken'] = response['NextToken']
response_list.append(func(**kwargs))
return response_list
def get_secrets_credentials(self, **kwargs):
"""
Retrieve the credentials with given name or tag from AWS SecretsManager and return as a dictionary.
:param kwargs: {type: tag/name, value: value_name}
:rtype: dict
:return: Some credentials
"""
filters, secrets_dict = [], {}
filter_type, value = kwargs.get('type'), kwargs.get('value')
valid_types = ['tag', 'name']
if not filter_type or filter_type not in valid_types:
raise KeyError('Error no type Tag/Name provided')
if not value:
raise KeyError('Error no value provided')
filters = [{'Key': 'name', 'Values': [value]}] if filter_type == 'name' else \
[{'Key': 'tag-value', 'Values': [value]}]
secretsmanager_client = self._get_secretsmanager_client()
secret_response = self.call_boto_secrets_with_pagination('list_secrets', Filters=filters)
secrets = [secret for secret in secret_response for secret in secret['SecretList']]
if secrets:
for secret in secrets:
secret_value = secretsmanager_client.get_secret_value(SecretId=secret['ARN'])
secrets_dict[secret['Name']] = secret_value['SecretString']
else:
logging.warning('No credentials found in SecretsManager for %s with %s', filter_type, value)
return secrets_dict
return secrets_dict
class SSMConfig:
"""
Methods to access some configurations and/or credentials stored in AWS SSM ParameterStore.
Please note that SSM has a pretty low limit of concurrent calls and it THROTTLES.
For high load Lambdas it is recommended to use DynamoConfig instead.
"""
ssm_client = None
def __init__(self, test=False, **kwargs):
self.test = test
if not self.test:
self.test = True if os.environ.get('STAGE') == 'test' or os.environ.get('autotest') == 'True' else False
def _get_ssm_client(self):
if self.ssm_client is None:
self.ssm_client = boto3.client('ssm')
return self.ssm_client
def get_config(self, name):
"""
Retrieve the Config from AWS SSM ParameterStore and return as a JSON parsed dictionary.
:param str name: Name of config to extract
:rtype: dict
:return: Config of some Controller
"""
ssm_client = self._get_ssm_client()
try:
response = ssm_client.get_parameters(
Names=[name],
WithDecryption=True
)
except Exception:
response = ssm_client.get_parameters(
Names=[name],
WithDecryption=False
)
try:
config = json.loads(response['Parameters'][0]['Value'])
except (KeyError, IndexError, TypeError):
config = {}
return config
def update_config(self, name, val, **kwargs):
"""
Update a parameter in SSM ParameterStore with a new value.
:param str name: Parameter name to address.
:param object val: Parameter value to update.
"""
description = kwargs.get('description')
if not isinstance(description, str):
description = ''
param_type = kwargs.get('param_type')
if param_type not in ('String', 'StringList', 'SecureString'):
param_type = 'String'
ssm_client = self._get_ssm_client()
ssm_client.put_parameter(
Name=name,
Description=description,
Value=val,
Type=param_type,
Overwrite=True
)
def call_boto_with_pagination(self, f, **kwargs):
"""
Invoke SSM functions with the ability to paginate results.
:param str f: SSM function to invoke.
:param object kwargs: Keyword arguments for the function to invoke.
:rtype list
:return: List of paginated responses.
"""
ssm_client = self._get_ssm_client()
func = getattr(ssm_client, f)
can_paginate = getattr(ssm_client, 'can_paginate')(f)
if can_paginate:
logger.debug("'SSM.%s()' can natively paginate", f)
paginator = ssm_client.get_paginator(f)
response = paginator.paginate(**kwargs)
return list(response)
else:
logger.debug("'SSM.%s()' can not natively paginate", f)
response_list = []
response = func(**kwargs)
response_list.append(response)
while 'NextToken' in response:
kwargs['NextToken'] = response['NextToken']
response_list.append(func(**kwargs))
return response_list
def get_credentials_by_prefix(self, prefix):
"""
Retrieve the credentials with given `prefix` from AWS SSM ParameterStore and return as a dictionary.
In ParameterStore the values `Name` must begin with `prefix_` and they must have Tag:Environment `(production|dev)`.
The type of elements is expected to be SecureString. Regular strings could work, but not guaranteed.
:param str prefix: prefix of records to extract
:rtype: dict
:return: Some credentials
"""
env_tag = 'production' if not self.test else 'dev'
prefix = prefix if prefix.endswith('_') else prefix + '_'
describe_params_response = self.call_boto_with_pagination('describe_parameters',
ParameterFilters=[
{"Key": "tag:Environment", "Values": [env_tag]},
{
'Key': 'Name', 'Option': 'BeginsWith',
'Values': [prefix]
}])
logger.debug("SSM.describe_parameters(prefix=%s) received response: %s", prefix, describe_params_response)
params = [param for obj in describe_params_response for param in obj['Parameters']]
names = [param['Name'] for param in params]
if not names:
logger.warning(
"No credentials found in SSM ParameterStore with prefix %s for Environment: %s", prefix, env_tag)
return dict()
# This is supposed to work fine if you ask multiple keys even if some are not encrypted.
# Anyway you should encrypt everything.
decryption_required = any([True for param in params if param['Type'] == 'SecureString'])
result = dict()
for chunk_of_names in chunks(names, 10):
get_params_response = self.call_boto_with_pagination('get_parameters', Names=chunk_of_names,
WithDecryption=decryption_required)
logger.debug(f"SSM.get_parameters(names=%s) received response: %s", chunk_of_names, get_params_response)
# Update keys and values from this page of response to result. Removes the prefix away for keys.
params = [param for obj in get_params_response for param in obj['Parameters']]
if params:
result.update(dict([(x['Name'].replace(prefix, ''), x['Value'] if x['Value'] != 'None' else None)
for x in params]))
return result
class DynamoConfig:
"""
Methods to get/update config from/to DynamoDB.
"""
dynamo_client = None
def __init__(self, **kwargs):
self.test = kwargs.get('test')
if not self.test:
self.test = True if os.environ.get('STAGE') == 'test' or os.environ.get('autotest') == 'True' else False
self.config = {
'dynamo_client_config': {
'row_mapper': {
'env': 'S',
'config_name': 'S',
'config_value': 'S'
},
'required_fields': ['env', 'config_name', 'config_value'],
'table_name': 'config' if not self.test else 'autotest_config',
# 'region': TODO IMPLEMENT AS AN OPTIONAL PARAMETER FOR app.Processor
}
}
self.config.update(kwargs.get('config', {}))
def get_config(self, name, env="production"):
"""
Retrieve the Config from DynamoDB 'config' table and return as a JSON parsed dictionary.
If not in JSON format, returns a string.
:param str name: Name of config to extract
:param str env: Environment the variable belongs to: 'production' or 'dev'
:rtype: dict|string
:return: Configuration
"""
dynamo_client = self._get_dynamo_client()
if os.environ.get('STAGE') == 'test' or os.environ.get('autotest') == 'True':
dynamo_client.config['table_name'] = 'autotest_config'
items = dynamo_client.get_by_query(keys={'env': env, 'config_name': name})
item = items[0] if items else None
config_value = item.get('config_value') if item else None
try:
return json.loads(config_value)
except Exception:
return config_value if config_value is not None else {}
def update_config(self, name, val, **kwargs):
"""
Update a field in DynamoDB 'config' table with a new value.
:param str name: Field name to address.
:param object val: Field value to update.
"""
if self.test or os.environ.get('STAGE') in ['test', 'autotest']:
env = "dev"
else:
env = "production"
dynamo_client = self._get_dynamo_client()
dynamo_client.update(keys={'env': env, 'config_name': name}, attributes_to_update={'config_value': val})
def get_credentials_by_prefix(self, prefix, env="production"):
prefix = prefix if prefix.endswith('_') else prefix + '_'
if self.test or prefix.startswith('autotest_'):
env = "dev"
dynamo_client = self._get_dynamo_client()
items = dynamo_client.get_by_query(keys={'env': env, 'config_name': prefix},
comparisons={'config_name': 'begins_with'})
res = {}
for row in items:
try:
row['config_value'] = json.loads(row['config_value'])
except Exception:
pass
config_name = row['config_name'].replace(prefix, '')
res[config_name] = row['config_value']
return res
def _get_dynamo_client(self):
if self.dynamo_client is None:
dynamo_config = self.config.get('dynamo_client_config')
if self.test:
dynamo_config['table_name'] = 'autotest_config_component'
self.dynamo_client = DynamoDbClient(dynamo_config)
return self.dynamo_client
[docs]
class ConfigSource:
"""
A strategy adapter for config. Returns config from the selected config source.
You can implement your own functions using these clients, and they can even call different configurations.
:param str sources: Config clients to initialize. Supported: `SSM` and `Dynamo` (default).
Could be both, comma-separated. The first one then becomes default.
:param dict config: Custom configurations for clients. Should be in `ssm_config`, `dynamo_config`, etc.
Don't be confused, but sometimes configs also need their own configs. :)
"""
SUPPORTED_SOURCES = ('Dynamo', 'SSM')
def __init__(self, test=False, sources=None, config=None):
self.test = test or True if os.environ.get('STAGE') == 'test' else False
if not sources:
sources = ['Dynamo']
elif isinstance(sources, str):
sources = [x.strip() for x in sources.split(',')]
else:
raise ValueError(f"Unsupported sources: {sources}. Must be a csv or string of {self.SUPPORTED_SOURCES}")
assert all(x in self.SUPPORTED_SOURCES for x in sources), f"Unsupported sources: {sources}. Must be a csv or " \
f"string of {self.SUPPORTED_SOURCES}"
# Overwrite default configs with custom ones if provided.
self.config = {}
self.config.update(config or {})
self.default_source = None
for source in sources:
# Config of Config Client
cfg = self.config.get(f"{source.lower()}_config", {})
# Config Client class
cls = globals()[f"{source}Config"](config=cfg, test=self.test)
# Set instance of Config Client as attribute of current ConfigSource object.
setattr(self, f"{source.lower()}_config", cls)
if not self.default_source:
self.default_source = getattr(self, f"{source.lower()}_config")
logger.info("Initialized default_source = %s_config", source.lower())
self.secrets_manager_class = SecretsManager()
def get_config(self, name):
return self.default_source.get_config(name)
def update_config(self, name, val, **kwargs):
return self.default_source.update_config(name, val, **kwargs)
def get_credentials_by_prefix(self, prefix):
return self.default_source.get_credentials_by_prefix(prefix)
def get_secrets_credentials(self, **kwargs):
return self.secrets_manager_class.get_secrets_credentials(**kwargs)
test = True if os.environ.get('STAGE') == 'test' else False
__config_source = ConfigSource(test=test)
get_config = __config_source.get_config
update_config = __config_source.update_config
get_credentials_by_prefix = __config_source.get_credentials_by_prefix