Получение Http-ответа от объекта boto3 table.batch_writer - PullRequest
2 голосов
/ 21 марта 2019

В CSV есть список данных, которые я хочу поместить в таблицу DynamodB на AWS.См. Приведенный ниже примерный список.

    Mary,F,7065
    Anna,F,2604
    Emma,F,2003
    Elizabeth,F,1939
    Minnie,F,1746
    Margaret,F,1578
    Ida,F,1472
    Alice,F,1414
    Bertha,F,1320
    Sarah,F,1288
    Annie,F,1258
    Clara,F,1226
    Ella,F,1156
    Florence,F,1063
    Cora,F,1045
    Martha,F,1040
    Laura,F,1012
    Nellie,F,995
    Grace,F,982
    Carrie,F,949
    Maude,F,858
    Mabel,F,808
    Bessie,F,796
    Jennie,F,793
    Gertrude,F,787
    Julia,F,783
    Hattie,F,769
    Edith,F,768
    Mattie,F,704
    Rose,F,700
    Catherine,F,688
    Lillian,F,672
    Ada,F,652
    Lillie,F,647
    Helen,F,636
    Jessie,F,635
    Louise,F,635
    Ethel,F,633
    Lula,F,621
    Myrtle,F,615
    Eva,F,614
    Frances,F,605
    Lena,F,603
    Lucy,F,590
    Edna,F,588
    Maggie,F,582
    Pearl,F,569
    Daisy,F,564
    Fannie,F,560
    Josephine,F,544

Чтобы записать более 25 элементов в таблицу DynamodB, документы используют объект batch_writer.

    resource = boto3.resource('dynamodb')
    table = resource.Table('Names')
    with table.batch_writer() as batch:
        for item in items:
            batch.put_item(item)

Есть ли способ вернутьHTTP-ответ, указывающий на успешное завершение batch_write?Я знаю, что это асинхронно.Есть ли ожидание или получение или что-то, чтобы позвонить?

Ответы [ 2 ]

1 голос
/ 29 марта 2019

Документы для объекта BatchWriter, созданного batch_writer, расположены (<3 Open Source) <a href="https://github.com/boto/boto3/blob/master/boto3/dynamodb/table.py" rel="nofollow noreferrer"> здесь . Глядя на класс BatchWriter, метод _flush генерирует ответ, он просто нигде не хранится.

class BatchWriter(object):
    """Automatically handle batch writes to DynamoDB for a single table."""
    def __init__(self, table_name, client, flush_amount=25,
                 overwrite_by_pkeys=None):
        """
        :type table_name: str
        :param table_name: The name of the table.  The class handles
            batch writes to a single table.
        :type client: ``botocore.client.Client``
        :param client: A botocore client.  Note this client
            **must** have the dynamodb customizations applied
            to it for transforming AttributeValues into the
            wire protocol.  What this means in practice is that
            you need to use a client that comes from a DynamoDB
            resource if you're going to instantiate this class
            directly, i.e
            ``boto3.resource('dynamodb').Table('foo').meta.client``.
        :type flush_amount: int
        :param flush_amount: The number of items to keep in
            a local buffer before sending a batch_write_item
            request to DynamoDB.
        :type overwrite_by_pkeys: list(string)
        :param overwrite_by_pkeys: De-duplicate request items in buffer
            if match new request item on specified primary keys. i.e
            ``["partition_key1", "sort_key2", "sort_key3"]``
        """
        self._table_name = table_name
        self._client = client
        self._items_buffer = []
        self._flush_amount = flush_amount
        self._overwrite_by_pkeys = overwrite_by_pkeys

    def put_item(self, Item):
        self._add_request_and_process({'PutRequest': {'Item': Item}})

    def delete_item(self, Key):
        self._add_request_and_process({'DeleteRequest': {'Key': Key}})

    def _add_request_and_process(self, request):
        if self._overwrite_by_pkeys:
            self._remove_dup_pkeys_request_if_any(request)
        self._items_buffer.append(request)
        self._flush_if_needed()

    def _remove_dup_pkeys_request_if_any(self, request):
        pkey_values_new = self._extract_pkey_values(request)
        for item in self._items_buffer:
            if self._extract_pkey_values(item) == pkey_values_new:
                self._items_buffer.remove(item)
                logger.debug("With overwrite_by_pkeys enabled, skipping "
                             "request:%s", item)

    def _extract_pkey_values(self, request):
        if request.get('PutRequest'):
            return [request['PutRequest']['Item'][key]
                    for key in self._overwrite_by_pkeys]
        elif request.get('DeleteRequest'):
            return [request['DeleteRequest']['Key'][key]
                    for key in self._overwrite_by_pkeys]
        return None

    def _flush_if_needed(self):
        if len(self._items_buffer) >= self._flush_amount:
            self._flush()

    def _flush(self):
        items_to_send = self._items_buffer[:self._flush_amount]
        self._items_buffer = self._items_buffer[self._flush_amount:]
        response = self._client.batch_write_item(
            RequestItems={self._table_name: items_to_send})
        unprocessed_items = response['UnprocessedItems']

        if unprocessed_items and unprocessed_items[self._table_name]:
            # Any unprocessed_items are immediately added to the
            # next batch we send.
            self._items_buffer.extend(unprocessed_items[self._table_name])
        else:
            self._items_buffer = []
        logger.debug("Batch write sent %s, unprocessed: %s",
                     len(items_to_send), len(self._items_buffer))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        # When we exit, we need to keep flushing whatever's left
        # until there's nothing left in our items buffer.
        while self._items_buffer:
            self._flush()

Как я это решил:

Я строил ответы на этот вопрос о перезаписи методов класса. Все они работают, но лучше всего в моем случае использования было перезаписать экземпляр класса этой версией _flush.

Сначала я создал новую версию _flush.

import logging
import types

## New Flush

def _flush(self):
    items_to_send = self._items_buffer[:self._flush_amount]
    self._items_buffer = self._items_buffer[self._flush_amount:]
    self._response = self._client.batch_write_item(
        RequestItems={self._table_name: items_to_send})
    unprocessed_items = self._response['UnprocessedItems']

    if unprocessed_items and unprocessed_items[self._table_name]:
        # Any unprocessed_items are immediately added to the
        # next batch we send.
        self._items_buffer.extend(unprocessed_items[self._table_name])
    else:
        self._items_buffer = []
    logger.debug("Batch write sent %s, unprocessed: %s",
                 len(items_to_send), len(self._items_buffer))


Тогда я переписал метод экземпляра следующим образом.

with batch_writer() as batch:
    batch._flush=types.MethodType(_flush, batch)
    for item in items:
        batch.put_item(Item=item)
print(batch._response)

И это создает такой вывод.

{'UnprocessedItems': {},
 'ResponseMetadata': {'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '23',
   'connection': 'keep-alive',
   'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
   'x-amz-crc32': '4185382645'},
  'RetryAttempts': 0}}
1 голос
/ 26 марта 2019

Кажется, нет встроенного способа сделать это. Однако метод _flush в BatchWriter регистрирует сообщение отладки, когда завершает пакет. Если вы просто хотите увидеть, что происходит, вы можете включить ведение журнала отладки до цикла put_item:

import logging
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG)

Если вы хотите предпринять какое-то действие, вы можете создать пользовательский logging.Handler, что-то вроде этого:

import logging
import sys

class CatchBatchWrites(logging.Handler):
    def handle(self, record):
        if record.msg.startswith('Batch write sent'):
            processed, unprocessed = record.args
            # do something with these numbers


logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG) # still necessary
logger.addHandler(CatchBatchWrites())
...