DynamoDB Client

View Licence Agreement

class sosw.components.dynamo_db.DynamoDbClient(config)[source]

Has default methods for different types of DynamoDB tables.

The current implementation supports only one fixed table during initialization, but you are free to initialize multiple simultaneous dynamo_clients in your Lambda with different configs.

Config should have a mapping for the field types and required fields. Config example:

{
    'row_mapper':     {
        'col_name_1':      'N', # Number
        'col_name_2':      'S', # String
    },
    'required_fields': ['col_name_1']
    'table_name': 'some_table_name',  # If a table is not specified, this table will be used.
    'hash_key': 'the_hash_key',
    'dont_json_loads_results': True  # Use this if you don't want to convert json strings into json
}
_describe_table(table_name: str | None = None) Dict[source]

Returns description of the table from AWS. Response like: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.describe_table

Returns:

Description of the table

_parse_filter_expression(expression: str) Tuple[str, Dict][source]

Converts FilterExpression to Dynamo syntax. We still do not support some operators. Feel free to implement: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.OperatorsAndFunctions.html

Supported: regular comparators, between, attribute_[not_]exists

Returns:

Returns a tuple of the transformed expression and extracted variables already Dynamo formatted.

_query_constructor(keys: Dict, table_name: str | None = None, *, index_name: str | None = None, comparisons: Dict | None = None, max_items: int | None = None, filter_expression: str | None = None, strict: bool | None = None, return_count: bool = False, desc: bool = False, fetch_all_fields: bool | None = None, expr_attrs_names: list | None = None, consistent_read: bool | None = None) dict[source]

Constructs a query to retrieve items from a DynamoDB table based on specified parameters. Can specify an index. If an index is not specified, will query the table.

Parameters:

keys (dict) – Keys and values to use in query. You must specify the hash key, and can optionally also add the range key.

Optional

Parameters:
  • table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.

  • index_name (str) – Name of the secondary index in the table. If not specified, will query the table itself.

  • comparisons (dict) – Type of comparison for each key. If a key is not mentioned, comparison type will be =. Valid values: =, <, <=, >, >=, begins_with. Comparisons only work for the range key. Example: if keys={'hk': 'cat', 'rk': 100} and comparisons={'rk': '<='} -> will get items where rk <= 100

  • max_items (int) – Limit the number of items to fetch.

  • filter_expression (str) – Supports regular comparisons and between. Input must be a regular human string e.g. 'key <= 42', 'name = marta', 'foo between 10 and 20', etc.

  • desc (bool) – By default, (False) the values will be sorted ascending by the SortKey. To reverse the order set the argument desc=True.

  • fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.

  • expr_attrs_names (list) – List of attributes names, in case if an attribute name begins with a number or contains a space, a special character, or a reserved word, you must use an expression attribute name to replace that attribute’s name in the expression. Example, if the list [‘session’, ‘key’] is received, then a new dict will be assigned to ExpressionAttributeNames: {'#session': 'session', '#key': 'key'}

  • consistent_read (bool) – If True , then the operation uses strongly consistent reads; otherwise, the operation uses eventually consistent reads. Default is False

Returns:

Query parameters for boto3 Dynamo DB query

batch_get_items_one_table(keys_list, table_name=None, max_retries=0, retry_wait_base_time=0.2, strict=None, fetch_all_fields=None, consistent_read=None)[source]

Gets a batch of items from a single dynamo table. Only accepts keys, can’t query by other columns.

Parameters:

keys_list (list) – A list of the keys of the items we want to get. Gets the items that match the given keys. If some key doesn’t exist - it just skips it and gets the others. e.g. [{‘hash_col’: ‘1, ‘range_col’: 2}, {‘hash_col’: 3}] - will get a row where hash_col is 1 and range_col is 2, and also all rows where hash_col is 3.

Optional

Parameters:
  • table_name (str) –

  • max_retries (int) – If failed to get some items, retry this many times. Waiting between retries is multiplied by 2 after each retry, so retries shouldn’t be a big number. Default is 1.

  • retry_wait_base_time (int) – Wait this much time after first retry. Will wait twice longer in each retry.

  • strict (bool) – DEPRECATED.

  • fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.

  • consistent_read (bool) – If True , then the operation uses strongly consistent reads; otherwise, the operation uses eventually consistent reads. Default is False

Returns:

List of items from the table

Return type:

list

create(row: Dict, table_name: str | None = None)[source]

Uses the mechanism of the put method, but first validates that the item with same hash & [range] key[s] does not exist in the table. Otherwise, raises: ConditionalCheckFailedException

Warning

This method requires the config to have a ‘hash_key’ parameter with a name of a field.

Parameters:
  • row

  • table_name

delete(keys: Dict, table_name: str | None = None)[source]
Parameters:
  • keys (dict) – Keys and values of the row we delete.

  • table_name

dict_to_dynamo(row_dict, add_prefix=None, strict=True)[source]

Convert the row from regular dictionary to the ugly DynamoDB syntax. Takes settings from row_mapper.

e.g. {'key1': 'value1', 'key2': 'value2'} will convert to: {'key1': {'Type1': 'value1'}, 'key2': {'Type2': 'value2'}}

Parameters:
  • row_dict (dict) – A row we want to convert to dynamo syntax.

  • add_prefix (str) – A string prefix to add to the key in the result dict. Useful for queries like update.

  • strict (bool) – If False, will get the type from the value in the dict (this works for numbers and strings). If True, won’t add them if they’re not in the required_fields, and if they are, will raise an error. Uses boto3.types.TypeSerializer for type conversion, but before that automatically guesses that numeric values should become N type, boolean types or ‘true/false’ strings - B, dictionaries - M recursively.

Returns:

DynamoDB Task item

Return type:

dict

dynamo_to_dict(dynamo_row: Dict, strict: bool | None = None, fetch_all_fields: bool | None = None) Dict[source]

Convert the ugly DynamoDB syntax of the row, to regular dictionary. We currently support only String or Numeric values. Latest ones are converted to int or float. Takes settings from row_mapper.

e.g.: {'key1': {'N': '3'}, 'key2': {'S': 'value2'}} will convert to: {'key1': 3, 'key2': 'value2'}

Parameters:
  • dynamo_row (dict) – DynamoDB row item

  • strict (bool) – DEPRECATED.

  • fetch_all_fields (bool) – If False only row_mapper fields will be extracted from dynamo_row, else, all fields will be extracted from dynamo_row.

Returns:

The row in a key-value format

Return type:

dict

get_by_query(keys: Dict, **kwargs) List[Dict][source]

Executes a query to the DynamoDB database using the provided keys and additional parameters.

For signature description see: query_constructor

get_by_scan(attrs=None, table_name=None, index_name=None, strict=None, fetch_all_fields=None, consistent_read=None)[source]

Scans a table. Don’t use this method if you want to select by keys. It is SLOW compared to get_by_query. Careful - don’t make queries of too many items, this could run for a long time.

Optional:

Parameters:
  • attrs (dict) – Attribute names and values of the items we get. Can be empty to get the whole table.

  • table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.

  • index_name (str) – Name of the dynamo table index. If not specified, will use index_name from the config. If not specified also in the config, will scan the table itself without any index.

  • consistent_read (bool) – If True , then the operation uses strongly consistent reads; otherwise, the operation uses eventually consistent reads. Default is False

  • strict (bool) – DEPRECATED.

  • fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.

Returns:

List of items from the table, each item in key-value format

Return type:

list

get_by_scan_generator(attrs=None, table_name=None, index_name=None, strict=None, fetch_all_fields=None, consistent_read=None)[source]

Scans a table. Don’t use this method if you want to select by keys. It is SLOW compared to get_by_query. Careful - don’t make queries of too many items, this could run for a long time. Same as get_by_scan, but yields parts of the results.

Optional:

Parameters:
  • attrs (dict) – Attribute names and values of the items we get. Can be empty to get the whole table.

  • table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.

  • index_name (str) – Name of the dynamo table index. If not specified, will use index_name from the config. If not specified also in the config, will scan the table itself without any index.

  • consistent_read (bool) – If True , then the operation uses strongly consistent reads; otherwise, the operation uses eventually consistent reads. Default uses this settings of boto3 (False).

  • strict (bool) – DEPRECATED.

  • fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If false, will get all attributes. Default is True.

Returns:

List of items from the table, each item in key-value format

Return type:

list

get_capacity(table_name=None)[source]

Fetches capacity for data tables

Keyword Arguments:

table_name {str} – DynamoDB (default: {None})

Returns:

dict – read/write capacity for the table requested or None for ON_DEMAND (PAY_PER_REQUEST) tables

get_stats()[source]

Return statistics of operations performed by current instance of the Class.

Returns:

  • dict - key: int statistics.

get_table_indexes(table_name: str | None = None) Dict[source]

Returns active indexes of the table: their hash key, range key, and projection type.

{
    'index_1_name': {
        'projection_type': 'ALL',  # One of: 'ALL'|'KEYS_ONLY'|'INCLUDE'
        'hash_key': 'the_hash_key_column_name',
        'range_key': 'the_range_key_column_name',  # Can be None if the index has no range key
        'provisioned_throughput': {
            'write_capacity': 5,
            'read_capacity': 10
        }
    },
    'index_2_name': ...
}

Note

In case the table has ON DEMAND (PAY_PER_REQUEST) BillingMode the provisioned_throughput is set to 0.

get_table_keys(table_name: str | None = None) Tuple[str, str | None][source]

Returns table’s hash key name and range key name

Parameters:

table_name

Returns:

hash key and range key names

identify_dynamo_capacity(table_name=None)[source]

Identify and store the table capacity for a given table on the object.

In case the table has ON DEMAND (PAY_PER_REQUEST) BillingMode the ProvisionedThroughput is missing.

Arguments:

table_name {str} – short name of the dynamo db table to analyze

patch(keys: Dict, attributes_to_update: Dict | None = None, attributes_to_increment: Dict | None = None, table_name: str | None = None, attributes_to_remove: List[str] | None = None)[source]

Updates an item in DynamoDB. Will fail if an item with these keys does not exist.

put(row: Dict, table_name: str | None = None, overwrite_existing: bool = True)[source]

Writes the row to the DynamoDB table.

Parameters:
  • row – The row to add to the table. key is column name, value is value.

  • table_name – Name of the dynamo table to add the row to.

  • overwrite_existing – Overwrite the existing row if True, otherwise will raise an exception if exists.

Warning

overwrite_existing option requires the config to have a ‘hash_key’ parameter with a name of a field.

reset_stats()[source]

Cleans statistics.

sleep_db(last_action_time: datetime, action: str, table_name=None)[source]

Sleeps between calls to dynamodb (if it needs to). Uses the table’s capacity to decide how long it needs to sleep. No need to sleep for ON DEMAND (PAY_PER_REQUEST) tables.

Parameters:
  • last_action_time – Last time when we did this action (read/write) to this dynamo table

  • action – “read” or “write”

transact_write(*transactions: Dict)[source]

Executes many write transaction. Can execute operations on different tables. Will split transactions to chunks - because transact_write_items accepts up to 10 actions. WARNING: If you’re expecting a transaction on more than 10 operations - AWS DynamoDB doesn’t support it.

dynamo_db_client = DynamoDbClient(config)
t1 = dynamo_db_client.make_put_transaction_item(row, table_name='table1')
t2 = dynamo_db_client.make_delete_transaction_item(row, table_name='table2')
dynamo_db_client.transact_write(t1, t2)
update(keys: Dict, attributes_to_update: Dict | None = None, attributes_to_increment: Dict | None = None, table_name: str | None = None, condition_expression: str | None = None, attributes_to_remove: List[str] | None = None)[source]

Updates an item in DynamoDB. Will create a new item if it doesn’t exist. IMPORTANT - If you want to make sure it exists, use patch method

Parameters:
  • keys (dict) – Keys and values of the row we update. Example, in a table where the hash key is ‘hk’ and the range key is ‘rk’: {'hk': 'cat', 'rk': '123'}

  • attributes_to_update (dict) – Dict of the attributes to be updated. Can contain both existing attributes and new attributes. Will update existing, and create new attributes. Example: {'col_name': 'some_value'}

  • attributes_to_increment (dict) – Attribute names to increment, and the value to increment by. If the attribute doesn’t exist, will create it. Example: {'some_counter': '3'}

  • attributes_to_remove (list) – Will remove these attributes from the record

  • condition_expression (str) – Condition Expression that must be fulfilled on the object to update.

  • table_name (str) – Name of the table

sosw.components.dynamo_db.clean_dynamo_table(table_name='autotest_dynamo_db', keys=('hash_col', 'range_col'), filter_expression=None)[source]

Cleans the DynamoDB Table. Only for autotest tables.

Parameters:
  • table_name (str) – name of the table

  • keys (tuple) – the keys of the table

  • filter_expression (str) – Supports regular comparisons and between. Input must be a regular human string e.g. 'key <= 42', 'name = marta', 'foo between 10 and 20', etc.

Warning

There are some reserved words that would not work with Filter Expression in case they are attribute names. Fix this one day.