DynamoDB Миграция данных с преобразованием - PullRequest
0 голосов
/ 23 мая 2019

Мне нужно перенести данные из Таблицы A в Таблицу B.

Мне нужно переместить каждый элемент в Таблице A с наибольшим диапазономKey в Таблицу B.

То есть каждый элементс самой большой версией, будет находиться в таблице B.

Я думал о сканировании таблицы A, а затем о выполнении условной записи в таблицу B (запись, если существует только элемент с меньшим значением rangeKey дляэтот предмет, или если предмет вообще не существует), но это кажется невозможным, поскольку это занимает слишком много времени (таблица А ОГРОМНА).

Есть ли лучший способ сделать это?

Спасибо!

Ответы [ 2 ]

1 голос
/ 23 мая 2019

Да, это можно сделать более эффективно, но сначала вам нужно получить все значения ключа раздела, которые вы храните в своей таблице.

Если вы еще не знаете значения, вам потребуется scan таблицы и извлечь эти уникальные значения, вы можете использовать ProjectionExpression, чтобы получить только значения ключа раздела.

пример на Python

def get_partition_key_values():
    response = dynamodb_client.scan(
        TableName=SOURCE_TABLE,
        ProjectionExpression=PARTITION_KEY_NAME
    )

    values = [item[PARTITION_KEY_NAME][PARTITION_KEY_TYPE]
              for item in response['Items']]
    return set(values)

Теперь, когда у вас есть значения ключа раздела, вы можете просто запросить таблицу для каждого уникального ключа раздела в цикле, отсортировать результат с помощью атрибута ScanIndexForward, установленного на False, иLimit количество возвращаемых значений в 1, которое будет возвращать один элемент на ключ раздела с «самым большим» ключом сортировки.

Поскольку вы просматриваете ключи и получаете нужные элементы, вы можете поместитьих по одной в таблицу назначения

def copy_items(partition_keys):
    for key in partition_keys:
        item = dynamodb_client.query(
            TableName=SOURCE_TABLE,
            KeyConditionExpression='#pid = :pid',
            ExpressionAttributeNames={
                '#pid': PARTITION_KEY_NAME
            },
            ExpressionAttributeValues={
                ':pid': {
                    PARTITION_KEY_TYPE: key
                }
            },
            Limit=1,
            ScanIndexForward=False
        )['Items'][0]

        dynamodb_client.put_item(
            TableName=DESTINATION_TABLE,
            Item=item
        )

Вот полный код

import boto3

dynamodb_client = boto3.client('dynamodb')

SOURCE_TABLE = 'products'
DESTINATION_TABLE = 'products_copy'

PARTITION_KEY_NAME = 'product_id'
PARTITION_KEY_TYPE = 'S'


def get_partition_key_values():
    response = dynamodb_client.scan(
        TableName=SOURCE_TABLE,
        ProjectionExpression=PARTITION_KEY_NAME
    )

    values = [item[PARTITION_KEY_NAME][PARTITION_KEY_TYPE]
              for item in response['Items']]
    return set(values)


def copy_items(partition_keys):
    for key in partition_keys:
        item = dynamodb_client.query(
            TableName=SOURCE_TABLE,
            KeyConditionExpression='#pid = :pid',
            ExpressionAttributeNames={
                '#pid': PARTITION_KEY_NAME
            },
            ExpressionAttributeValues={
                ':pid': {
                    PARTITION_KEY_TYPE: key
                }
            },
            Limit=1,
            ScanIndexForward=False
        )['Items'][0]

        dynamodb_client.put_item(
            TableName=DESTINATION_TABLE,
            Item=item
        )


unique_partition_key_values = get_partition_key_values()

copy_items(unique_partition_key_values)

Просто обратите внимание, что в приведенном выше коде предполагается, что и таблица источника, и таблица назначения имеют одинаковую схему первичного ключа,Если эти две схемы отличаются в вашем случае, вам потребуется выполнить дополнительное преобразование / отображение.

0 голосов
/ 23 мая 2019

Вот как мы это сделали.

Lambda (для сканирования и удаления старых записей) -> DynamoDB Stream -> Lambda to Write (преобразование DO и запись в новую таблицу)

Если у вас динамически настроены разделы и емкость, вы можете запустить несколько лямбда-выражений для обработки различных разделов и соответственно удалить данные.

Выше приведен простой подход.

Вы также можете сделать это через Data Pipeline.

Экспорт записей DynamoDB с конвейером данных -> S3 -> Преобразовать с помощью лямбды и запись в S3 -> Импорт с конвейером данных

https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...