Конкатенация файлов менее 1 МБ в S3 или Azure с использованием параллельной обработки. Решение должно быть масштабируемым - PullRequest
0 голосов
/ 24 февраля 2020

У нас есть сервис, который генерирует 3000 файлов в минуту. Размер файла составляет менее 5 КБ. Они хранятся в хранилище BLOB-объектов в azure. Нам нужно объединить эти файлы и отправить их на S3. Окончательный размер файла в s3 должен быть в диапазоне от 10 МБ до 100 МБ (эти данные поступают в снежинку через Snowpipe.). Как это можно сделать быстрым и экономически эффективным способом.

Добавление дополнительной информации: что я уже пробовал:

1) Отправка события создания большого двоичного объекта в очередь azure. Функция запуска очереди для загрузки данных в S3. Затем с помощью aws лямбда-конкатенации (но обычно лямбда-тайм-аут)

2) Python код, который использует многопроцессорную обработку, которая читает очередь и блоб azure, а затем объединяет данные для создания файла размером 10 МБ. и отправить его на S3. Пробовал запускать этот код из azure веб-задания. (У Webjob всего 4 ядра). Это недостаточно быстро и не масштабируется.

Мне нужно решение, которое может выполнять задачи параллельно наиболее экономически эффективным образом и масштабируемо. Это может быть пакетный процесс. Задержка данных в S3 может составлять 24 часа. (Невозможно использовать пакет azure, поскольку мы уже исчерпали количество учетных записей для нашего плана подписки для какого-либо другого процесса.).

Любые рекомендации для инструментов или услуг ETL, которые лучше всего подходят для этого случая.

1 Ответ

0 голосов
/ 25 февраля 2020

Одна креативная опция, которая приходит в голову, - это создать AWS Lambda функцию, которая будет выполнять несколько шагов:

  • Создать имя для объекта вывода в S3
  • Список BLOB-объектов (возможно, по шаблону, с учетом ваших особенностей)
  • Итерация по списку
  • Загрузка blob
  • Сохранение всего содержимого к объекту с помощью многоэтапной загрузки (рекомендуется использовать smart_open lib)
  • [Необязательно] Удалить все исходные файлы

Теперь вы можете планировать эта функция запускается каждые 6-10 минут (в зависимости от размера входных файлов)

Код функции должен выглядеть следующим образом (хотя это не проверено, только скомпилировано из документов)

import datetime

import smart_open
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobClient

_STORAGE_ACCOUNT_NAME = "<storage account name>"
_STORAGE_ACCOUNT_KEY = "<storage account key>"
_CONTAINER_NAME = "<container name>"

blob_service = BlockBlobService(
    account_name=_STORAGE_ACCOUNT_NAME,
    account_key=_STORAGE_ACCOUNT_KEY)

s3_path = f"s3://your-bucket/prefix/path/{datetime.now().strftime('%Y-%m-%d-%H-%M')}.gz"


def lambda_handler(a, b):
    generator = blob_service.list_blobs(_CONTAINER_NAME)

    with smart_open.open(s3_path, 'wb') as out:
        for blob in generator:
            blob_client = BlobClient.from_blob_url(
                blob_url=f"https://account.blob.core.windows.net/{blob.container}/{blob.name}")
            out.write(blob_client.download_blob().readall())


if __name__ == '__main__':
    lambda_handler(None, None)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...