Как ждать пакетной операции DynamoDB в Python, а также как улучшить производительность пакетной операции? - PullRequest
0 голосов
/ 29 марта 2020

Я использую библиотеку boto3 в Python для выполнения нескольких последовательных операций пакетного удаления с использованием функции DynamoDB batch_writer(), используя код, показанный ниже. Как мне дождаться завершения каждой операции, прежде чем продолжить выполнение?

Кроме того, удаление занимает удивительно много времени, по крайней мере по сравнению с моим опытом использования других баз данных, таких как PostgreSQL. Например, одной из таблиц требуется от 5 до 10 минут, чтобы завершить удаление только 3000 элементов базы данных. Есть ли способ значительно ускорить удаление? Это что-то плохое в моем коде с точки зрения производительности?

Вот обзор таблицы:

Table name  connected_words_list
Primary partition key   n_gram (String)
Primary sort key    connected_word (String)
Point-in-time recovery  DISABLED
Encryption Type DEFAULT
KMS Master Key ARN  Not Applicable
Encryption Status   
CloudWatch Contributor Insights DISABLE
Time to live attribute  DISABLED
Table status    
Active
Creation date   March 25, 2020 at 5:07:27 PM UTC-4
UTC: March 25, 2020 at 9:07:27 PM UTC

Local: March 25, 2020 at 5:07:27 PM UTC-4

Region (N. Virginia): March 25, 2020 at 4:07:27 PM UTC-5

Read/write capacity mode    Provisioned
Last change to on-demand mode   -
Provisioned read capacity units 10 (Auto Scaling Disabled)
Provisioned write capacity units    10 (Auto Scaling Disabled)
Last decrease time  -
Last increase time  -
Storage size (in bytes) 494.00 bytes
Item count  6
Region  US East (N. Virginia)
Amazon Resource Name (ARN)  <redacted>

Вот мой код:

import boto3
import json
import re
import pprint
import os
import pandas as pd
from decimal import *
import itertools
import sys

in_bucket_name = 'my_bucket'
s3 = boto3.client('s3', region_name='us-east-1')
amazon_comprehend_medical = boto3.client('comprehendmedical', region_name='us-east-1')

# Table for the Amazon Comprehend Medicaltable.
ddt_amazon_compmed = 'compmed_table'

# Table for the connected words list table.
ddt_connected_words = 'connected_words_list'

# Table for the source documents table.
ddt_source_documents = 'source_documents'

# Table for the document metadata table.
ddt_document_metadata = 'document_metadata'

# Create a reference to DynamoDB.
dynamoDBResource = boto3.resource('dynamodb', region_name = 'us-east-1')

# Create references to our DynamoDB tables.
table_compmed = dynamoDBResource.Table(ddt_amazon_compmed)
table_connected_words = dynamoDBResource.Table(ddt_connected_words)
table_source_documents = dynamoDBResource.Table(ddt_source_documents)
table_document_metadata = dynamoDBResource.Table(ddt_document_metadata)

def truncateTable(tableToTruncate, theProjectionExpression, funcBuildDeletionKey):
    """Given a reference to a DynamoDB table, a projection expression with the list 
    item attributes that make up the primary key, and a function that takes a 
    DynamoDB item and returns a key built from the item that can be used to delete 
    it, delete all of the records in that table using a repetitive batch delete 
    operation."""

    scan = None

    with tableToTruncate.batch_writer() as batch:
        numDeletions = 0
        while scan is None or 'LastEvaluatedKey' in scan:
            if scan is not None and 'LastEvaluatedKey' in scan:
                scan = tableToTruncate.scan(
                    ProjectionExpression=theProjectionExpression,
                    ExclusiveStartKey=scan['LastEvaluatedKey'],
                )
            else:
                scan = tableToTruncate.scan(ProjectionExpression=theProjectionExpression)

            for item in scan['Items']:
                if numDeletions % 1000 == 0:
                    print(numDeletions)
                deletionKey = funcBuildDeletionKey(item)
                batch.delete_item(Key=deletionKey)
                numDeletions = numDeletions + 1

def truncateAllTables():
    """Truncate all of our DynamoDB tables.  Remember to update this function 
    if we add more tables."""

    print("Truncating table: ", "table_compmed")
    theProjectionExpression = 'ROWID'
    funcBuildDeletionKey = lambda item: { 'ROWID': item['ROWID']}
    truncateTable(table_compmed, theProjectionExpression, funcBuildDeletionKey)

    print("Truncating table: ", "table_connected_words")
    theProjectionExpression = 'n_gram, connected_word'
    funcBuildDeletionKey = lambda item: { 'n_gram': item['n_gram'], 'connected_word': item['connected_word']}
    truncateTable(table_connected_words, theProjectionExpression, funcBuildDeletionKey)

    print("Truncating table: ", "table_source_documents")
    theProjectionExpression = 'n_gram, source_document_id'
    funcBuildDeletionKey = lambda item: { 'n_gram': item['n_gram'], 'source_document_id': item['source_document_id']}
    truncateTable(table_source_documents, theProjectionExpression, funcBuildDeletionKey)

    print("Truncating table: ", "table_document_metadata")
    theProjectionExpression = 'paper_id'
    funcBuildDeletionKey = lambda item: { 'paper_id': item['paper_id']}
    truncateTable(table_document_metadata, theProjectionExpression, funcBuildDeletionKey)

    print("Table truncation finished.")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...