Распределение Dask с асинхронным параллелизмом в реальном времени - PullRequest
0 голосов
/ 05 октября 2018

Я читаю документацию по dask.distributed и похоже, что я мог бы передавать функции в распределенный кластер через client.submit().

У меня есть существующая функция some_func, которая асинхронно захватывает отдельные документы (скажем, текстовый файл), и я хочу взять необработанный документ, взять все слова, которые не содержат гласную, и сдвинуть ее обратно.в другую базу данных.Этот шаг обработки данных блокирует.

Предполагая, что существует несколько миллионов документов, и распределенный кластер имеет только 10 узлов с 1 доступным процессом (т. Е. Он может обрабатывать только 10 документов за один раз), как будет выполняться.распределенная обработка потока документов, которые необходимо обработать?

Вот пример кода:

client = dask.distributed('tcp://1.2.3.4:8786')

def some_func():
    doc = retrieve_next_document_asynchronously() 
    client.submit(get_vowelless_words, doc)

def get_vowelless_words(doc):
    vowelless_words = process(doc)
    write_to_database(vowelless_words)

if __name__ == '__main__':
    for i in range(1000000):
        some_func()

Поскольку обработка документа блокируется, и кластер может обрабатывать только 10 документоводновременно, что происходит, когда 30 других документов извлекаются, когда кластер занят?Я понимаю, что client.submit() является асинхронным и возвращает параллельное будущее, но что произойдет в этом случае?Будет ли он хранить документ в памяти до тех пор, пока не будет доступно 1/10 ядер, и, возможно, приведет к тому, что у машины не хватит памяти, скажем, после того, как 1000 документов ждут.

Что бы сделал планировщик в этом случае?FIFO?Должен ли я каким-то образом изменить код, чтобы он ожидал доступности ядра, прежде чем получить следующий документ?Как это может быть достигнуто?

Ответы [ 2 ]

0 голосов
/ 06 октября 2018

Для использования очередей с dask ниже приведен модифицированный пример использования очередей dask с распределенным кластером (на основе документации ):

#!/usr/bin/env python

import distributed
from queue import Queue
from threading import Thread

client = distributed.Client('tcp://1.2.3.4:8786')
nprocs = len(client.ncores())

def increment(x):
    return x+1

def double(x):
    return 2*x

input_q = Queue(maxsize=nprocs)
remote_q = client.scatter(input_q)
remote_q.maxsize = nprocs
inc_q = client.map(increment, remote_q)
inc_q.maxsize = nprocs
double_q = client.map(double, inc_q)
double_q.maxsize = nprocs
result_q = client.gather(double_q)

def load_data(q):
    i = 0
    while True:
        q.put(i)
        i += 1

load_thread = Thread(target=load_data, args=(input_q,))
load_thread.start()

while True:
    size = result_q.qsize()
    item = result_q.get()
    print(item, size)

В этом случае мы явноограничить максимальный размер каждой очереди равным числу доступных распределенных процессов.В противном случае цикл while перегружает кластер.Конечно, вы можете настроить максимальный размер так, чтобы он был кратным числу доступных процессов.Для простых функций, таких как приращение и удвоение, я обнаружил, что maxsize = 10*nprocs все еще разумно, но это, безусловно, будет ограничено количеством времени, которое требуется для запуска вашей пользовательской функции.

0 голосов
/ 05 октября 2018

При вызове submit все аргументы сериализуются и сразу отправляются в планировщик.Альтернативой может быть получение документов и их обработка в кластере (предполагается, что документы видны всем работникам по всему миру).

for fn in filenames:
    doc = client.submit(retrieve_doc, fn)
    process = client.submit(process_doc, doc)
    fire_and_forget(process)

Если документы доступны только на вашем клиентском компьютере и вы хотите ограничитьтогда вы можете использовать dask Queues или итератор as_completed.

...