"""
.. hidden-code-block:: text
:label: View Licence Agreement <br>
sosw - Serverless Orchestrator of Serverless Workers
The MIT License (MIT)
Copyright (C) 2024 sosw core contributors <info@sosw.app>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Config manager component. Has methods for getting configuration for Lambdas.
The default is ``DynamoConfig``. It is automatically called from the ``app.Processor.__init__`` and if your Lambda
has permissions to access the ``config`` table, the Processor will look for the record: ``YOUR_FUNCTION_NAME_config``,
and recursively update the ``DEFAULT_CONFIG`` with it.
You can also import and use the following functions.
They can be directly imported from this module and will be automatically switched to DDB / SSM / Secrets manager.
- get_config_
- get_credentials_by_prefix_
- get_secrets_credentials_
- update_config_
.. warning::
Using these methods requires the Role to have relevant permissions to access DynamoDB / SSM Parameter Store /
AWS Secrets.
Usage example `(pseudo code)`:
.. code-block:: python
from SOME_DB_DRIVER import connect
from sosw.components.config import get_secrets_credentials, get_config, update_config
from sosw.app import Processor as SoswProcessor
class Processor(SoswProcessor):
def work_with_db(self):
db_settings = get_credentials_by_prefix('db_')
db_password = get_secrets_credentials(type='name', value='db_password')['db_password']
connection = connect(**db_settings, password=db_password)
last_processed_row = get_config('last_row')
db_result = connection.query(f"SOME QUERY LIMIT last_processed_row, 10;")
self.do_something(db_result)
update_config('last_row', last_processed_row + 10)
"""
__all__ = ['ConfigSource', 'get_config', 'update_config', 'get_credentials_by_prefix', 'get_secrets_credentials']
__author__ = "Sophie Fogel, Nikolay Grishchenko"
__version__ = "1.7.3"
try:
from aws_lambda_powertools import Logger
logger = Logger(child=True)
except ImportError:
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
import boto3
import json
import os
from sosw.components.helpers import chunks
from sosw.components.dynamo_db import DynamoDbClient
from sosw.test.helpers_test_dynamo_db import get_autotest_ddb_name_with_custom_suffix
[docs]
class SecretsManager:
secretsmanager_client = None
def __init__(self, test=False, **kwargs):
self.test = test
if not self.test:
self.test = True if os.environ.get('STAGE') == 'test' or os.environ.get('autotest') == 'True' else False
def _get_secretsmanager_client(self):
if self.secretsmanager_client is None:
self.secretsmanager_client = boto3.client('secretsmanager')
return self.secretsmanager_client
[docs]
def get_secrets_credentials(self, **kwargs) -> dict:
"""
Retrieve the credentials with given name or tag from AWS SecretsManager and return as a dictionary.
Must provide ``type`` and ``value`` to search. Type is either ``'tag'`` or ``'name'``. ``value`` is a string.
Usage example:
.. code-block:: python
my_secret = get_secrets_credentials(type='name', value='my_secret_name')
my_secrets_by_tag = get_secrets_credentials(type='tag', value='project_a_credentials')
"""
filters, secrets_dict = [], {}
filter_type, value = kwargs.get('type'), kwargs.get('value')
valid_types = ['tag', 'name']
if not filter_type or filter_type not in valid_types:
raise KeyError('Error no type Tag/Name provided')
if not value:
raise KeyError('Error no value provided')
filters = [{'Key': 'name', 'Values': [value]}] if filter_type == 'name' else \
[{'Key': 'tag-value', 'Values': [value]}]
secretsmanager_client = self._get_secretsmanager_client()
secret_response = self.call_boto_secrets_with_pagination('list_secrets', Filters=filters)
secrets = [secret for secret in secret_response for secret in secret['SecretList']]
if secrets:
for secret in secrets:
secret_value = secretsmanager_client.get_secret_value(SecretId=secret['ARN'])
secrets_dict[secret['Name']] = secret_value['SecretString']
else:
logger.warning('No credentials found in SecretsManager for %s with %s', filter_type, value)
return secrets_dict
return secrets_dict
[docs]
class SSMConfig:
"""
Methods to access some configurations and/or credentials stored in AWS SSM ParameterStore.
Please note that SSM has a pretty low limit of concurrent calls and it THROTTLES.
For high load Lambdas it is recommended to use DynamoConfig instead.
"""
ssm_client = None
def __init__(self, test=False, **kwargs):
self.test = test
if not self.test:
self.test = True if os.environ.get('STAGE') == 'test' or os.environ.get('autotest') == 'True' else False
def _get_ssm_client(self):
if self.ssm_client is None:
self.ssm_client = boto3.client('ssm')
return self.ssm_client
[docs]
def get_config(self, name):
"""
Retrieve the Config from AWS SSM ParameterStore and return as a JSON parsed dictionary.
:param str name: Name of config to extract
:rtype: dict
:return: Config of some Controller
"""
ssm_client = self._get_ssm_client()
try:
response = ssm_client.get_parameters(
Names=[name],
WithDecryption=True
)
except Exception:
response = ssm_client.get_parameters(
Names=[name],
WithDecryption=False
)
try:
config = json.loads(response['Parameters'][0]['Value'])
except (KeyError, IndexError, TypeError):
config = {}
return config
[docs]
def update_config(self, name, val, **kwargs):
"""
Update a parameter in SSM ParameterStore with a new value.
:param str name: Parameter name to address.
:param object val: Parameter value to update.
"""
description = kwargs.get('description')
if not isinstance(description, str):
description = ''
param_type = kwargs.get('param_type')
if param_type not in ('String', 'StringList', 'SecureString'):
param_type = 'String'
ssm_client = self._get_ssm_client()
ssm_client.put_parameter(
Name=name,
Description=description,
Value=val,
Type=param_type,
Overwrite=True
)
[docs]
def get_credentials_by_prefix(self, prefix):
"""
Retrieve the credentials with given `prefix` from AWS SSM ParameterStore and return as a dictionary.
In ParameterStore the values `Name` must begin with `prefix_` and they must have Tag:Environment `(production|dev)`.
The type of elements is expected to be SecureString. Regular strings could work, but not guaranteed.
:param str prefix: prefix of records to extract
:rtype: dict
:return: Some credentials
"""
env_tag = 'production' if not self.test else 'dev'
prefix = prefix if prefix.endswith('_') else prefix + '_'
describe_params_response = self.call_boto_with_pagination('describe_parameters',
ParameterFilters=[
{"Key": "tag:Environment", "Values": [env_tag]},
{
'Key': 'Name', 'Option': 'BeginsWith',
'Values': [prefix]
}])
logger.debug("SSM.describe_parameters(prefix=%s) received response: %s", prefix, describe_params_response)
params = [param for obj in describe_params_response for param in obj['Parameters']]
names = [param['Name'] for param in params]
if not names:
logger.warning(
"No credentials found in SSM ParameterStore with prefix %s for Environment: %s", prefix, env_tag)
return dict()
# This is supposed to work fine if you ask multiple keys even if some are not encrypted.
# Anyway you should encrypt everything.
decryption_required = any([True for param in params if param['Type'] == 'SecureString'])
result = dict()
for chunk_of_names in chunks(names, 10):
get_params_response = self.call_boto_with_pagination('get_parameters', Names=chunk_of_names,
WithDecryption=decryption_required)
logger.debug(f"SSM.get_parameters(names=%s) received response: %s", chunk_of_names, get_params_response)
# Update keys and values from this page of response to result. Removes the prefix away for keys.
params = [param for obj in get_params_response for param in obj['Parameters']]
if params:
result.update(dict([(x['Name'].replace(prefix, ''), x['Value'] if x['Value'] != 'None' else None)
for x in params]))
return result
[docs]
class DynamoConfig:
"""
This is a manager to operate with custom configurations for Lambdas stored in the ``config`` DynamoDB table.
It tries to find the table ``config`` in the same region where the Lambda runs with the following structure:
.. code-block:: python
'env': 'S',
'config_name': 'S',
'config_value': 'S',
If the table exists and the Lambda has permissions to access it, the class will look for the record:
``YOUR_FUNCTION_config``, and recursively update the ``DEFAULT_CONFIG`` with it.
.. _get_config:
"""
dynamo_client: DynamoDbClient = None
no_ddb_access: bool = None
def __init__(self, **kwargs):
self.test = kwargs.get('test')
if not self.test:
self.test = True if os.environ.get('STAGE') == 'test' or os.environ.get('autotest') == 'True' else False
self.config = {
'dynamo_client_config': {
'row_mapper': {
'env': 'S',
'config_name': 'S',
'config_value': 'S'
},
'required_fields': ['env', 'config_name', 'config_value'],
'table_name': 'config' if not self.test else 'autotest_config',
# 'region': TODO IMPLEMENT AS AN OPTIONAL PARAMETER FOR app.Processor
}
}
self.config.update(kwargs.get('config', {}))
[docs]
def get_config(self, name, env="production"):
"""
Retrieve the Config from DynamoDB ``config`` table and return as a JSON parsed dictionary.
If not in JSON format, returns a string.
.. note::
In case the environment is ``test`` the table name will be automatically changed to ``autotest_config``.
This might be relevant only for complex integration tests.
:param str name: Name of config to extract
:param str env: Environment name, usually: 'production' or 'dev'
:rtype: dict|string
:return: Configuration
.. _update_config:
"""
if dynamo_client := self._get_dynamo_client():
items = dynamo_client.get_by_query(keys={'env': env, 'config_name': name})
item = items[0] if items else None
config_value = item.get('config_value') if item else None
logger.debug("Got config value from DDB: %s", config_value)
try:
return json.loads(config_value)
except (json.JSONDecodeError, TypeError) as err:
return config_value if config_value is not None else {}
else:
logger.info("Tried to get DynamoConfig, but failed.")
return {}
[docs]
def update_config(self, name, val, **kwargs):
"""
Update a field in the DynamoDB ``config`` table with a new value. May be used to store not sensitive tokens.
.. warning: For sensitive credentials use SecretsManager!
.. _get_credentials_by_prefix:
:param str name: Field name to address.
:param object val: Field value to update.
"""
if self.test or os.environ.get('STAGE') in ['test', 'autotest']:
env = "dev"
else:
env = "production"
dynamo_client = self._get_dynamo_client()
dynamo_client.update(keys={'env': env, 'config_name': name}, attributes_to_update={'config_value': val})
[docs]
def get_credentials_by_prefix(self, prefix: str, env: str = 'production') -> dict:
"""
Fetches multiple records from the ``config`` table. Filters rows that start with the prefix and returns them
as a dict with this prefix trimmed.
.. warning: For sensitive credentials use SecretsManager!
Example in ``config`` DDB:
.. code-block:: python
{'env': 'production', 'config_name': 'project_a_db_username', 'config_value': 'john'}
{'env': 'production', 'config_name': 'project_a_db_port', 'config_value': '27019'}
{'env': 'production', 'config_name': 'another_db_username', 'config_value': 'silver'}
credentials = get_credentials_by_prefix('project_a_db')
# {'username': 'john', 'port': '27019'}
"""
prefix = prefix if prefix.endswith('_') else prefix + '_'
if self.test or prefix.startswith('autotest_'):
env = "dev"
res = {}
if dynamo_client := self._get_dynamo_client():
items = dynamo_client.get_by_query(keys={'env': env, 'config_name': prefix},
comparisons={'config_name': 'begins_with'})
for row in items:
try:
row['config_value'] = json.loads(row['config_value'])
except (json.JSONDecodeError, TypeError):
pass
config_name = row['config_name'].replace(prefix, '')
res[config_name] = row['config_value']
return res
def _get_dynamo_client(self):
if self.dynamo_client is None and not self.no_ddb_access:
dynamo_config = self.config.get('dynamo_client_config')
# FIXME Consider deprecating this. Used only for safety, but hardcoding is bad.
if self.test:
dynamo_config['table_name'] = get_autotest_ddb_name_with_custom_suffix('config')
try:
self.dynamo_client = DynamoDbClient(dynamo_config)
except Exception as err:
logger.warning("Failed to initialize DynamoDB client for ConfigSource: %s", err)
self.no_ddb_access = True
return self.dynamo_client
[docs]
class ConfigSource:
"""
A strategy adapter for config. Returns config from the selected config source.
You can implement your own functions using these clients, and they can even call different configurations.
:param str sources: Config clients to initialize. Supported: `SSM` and `Dynamo` (default).
Could be both, comma-separated. The first one then becomes default.
:param dict config: Custom configurations for clients. Should be in `ssm_config`, `dynamo_config`, etc.
Don't be confused, but sometimes configs also need their own configs. :)
"""
SUPPORTED_SOURCES = ('Dynamo', 'SSM')
def __init__(self, test=False, sources=None, config=None, **kwargs):
self.test = test or True if os.environ.get('STAGE') == 'test' else False
if not sources:
sources = ['Dynamo']
elif isinstance(sources, str):
sources = [x.strip() for x in sources.split(',')]
else:
raise ValueError(f"Unsupported sources: {sources}. Must be a csv or string of {self.SUPPORTED_SOURCES}")
assert all(x in self.SUPPORTED_SOURCES for x in sources), f"Unsupported sources: {sources}. Must be a csv or " \
f"string of {self.SUPPORTED_SOURCES}"
# Overwrite default configs with custom ones if provided.
self.config = {}
self.config.update(config or {})
self.default_source = None
for source in sources:
# Config of Config Client
cfg = self.config.get(f"{source.lower()}_config", {})
# Config Client class
cls = globals()[f"{source}Config"](config=cfg, test=self.test, **kwargs)
# Set instance of Config Client as attribute of current ConfigSource object.
setattr(self, f"{source.lower()}_config", cls)
if not self.default_source:
self.default_source = getattr(self, f"{source.lower()}_config")
logger.info("Initialized default_source = %s_config", source.lower())
self.secrets_manager_class = SecretsManager()
def get_config(self, name, **kwargs):
return self.default_source.get_config(name, **kwargs)
def update_config(self, name, val, **kwargs):
return self.default_source.update_config(name, val, **kwargs)
def get_credentials_by_prefix(self, prefix, **kwargs):
return self.default_source.get_credentials_by_prefix(prefix, **kwargs)
def get_secrets_credentials(self, **kwargs):
return self.secrets_manager_class.get_secrets_credentials(**kwargs)
test = True if os.environ.get('STAGE') == 'test' else False
__config_source = ConfigSource(test=test)
get_config = __config_source.get_config
update_config = __config_source.update_config
get_credentials_by_prefix = __config_source.get_credentials_by_prefix
get_secrets_credentials = __config_source.get_secrets_credentials