Amazon S3: быстрый способ извлечь части большого двоичного файла? - PullRequest
0 голосов
/ 07 марта 2019

Я хочу прочитать части большого двоичного файла на s3.Файл имеет следующий формат:

Header 1: 200  bytes
Data   1: 10000 bytes
Header 2: 200  bytes
Data   2: 10000 bytes
...
Header N: 200  bytes
Data   N: 10000 bytes

Я хочу извлечь все заголовки и сохранить их в файл.N обычно (1e6-> 1e8).

Какой самый быстрый способ сделать это?

До сих пор я пробовал boto3:

def s3_open(bucket, key):
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket, key)
    f = obj.get()['Body']
    return f

 f = s3_open(bucket, key)
 nread = 0
 while nread < N:
    remaining = N - nread
    n = min(1000, remaining)
    buf = f.read(n * 10200)
    # read 200 bytes from each of these n records and write to file 
    nread += n

Этомедленно, когда я запускаю его на своем локальном ПК.Вызов f.read () является узким местом.

1 Ответ

1 голос
/ 07 марта 2019

Исходя из этого ответа , вы можете распараллелить чтение, используя многопроцессорную обработку / многопоточность / ..., читая меньшие (но большие) фрагменты файла в нескольких заданиях.

def get_ranges(file_size, chunk_size, n_jobs):
    num_entries, remainder = divmod(file_size, chunk_size)
    assert not remainder  # sanity check for file size
    entries_per_process = num_entries // n_jobs
    assert entries_per_process >= 1
    ranges = [
        [
            pid * entries_per_process * chunk_size,
            (pid + 1) * entries_per_process * chunk_size,
        ]
        for pid in range(n_jobs)
    ]
    # fix up the last chunk in case there's an uneven distribution of jobs and chunks:
    ranges[-1][-1] = file_size
    return ranges


chunk_size = 200 + 10000
file_size = chunk_size * 15000  # assuming 15 000 chunks
ranges = get_ranges(file_size, chunk_size, 16)

for start, end in ranges:
    print(f"spawn something to process bytes {start}-{end}")

распечатывает что-то вроде

spawn something to process bytes 0-9557400
spawn something to process bytes 9557400-19114800
spawn something to process bytes 19114800-28672200
spawn something to process bytes 28672200-38229600
spawn something to process bytes 38229600-47787000
spawn something to process bytes 47787000-57344400
[...]

, объединяя его со связанным ответом и многопроцессорностью, что-то вроде:

import boto3
import multiprocessing 

def process_range(range):
    # To be on the safe side, let's not share the boto3 resource between
    # processes here.
    obj = boto3.resource('s3').Object('mybucket', 'mykey')
    stream = obj.get(Range='bytes=%d-%d' % (range[0], range[1]))['Body']
    stream.read()  # read the objects from the stream and do something with them
    return 42  # lucky number!

if __name__ == '__main__':
    obj = boto3.resource('s3').Object('mybucket', 'mykey')
    ranges = get_ranges(obj.content_length, chunk_size, 50)
    with multiprocessing.Pool() as p:
         # use imap() if you need order!
         for result in p.imap_unordered(process_range, ranges):
              pass

Это, естественно, все сухое и непроверенное, и при вычислении этого диапазона могут быть ошибки "один на один", так что YMMV, но я надеюсь, что это поможет:)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...