Я читаю документацию по dask.distributed
и похоже, что я мог бы передавать функции в распределенный кластер через client.submit()
.
У меня есть существующая функция some_func
, которая асинхронно захватывает отдельные документы (скажем, текстовый файл), и я хочу взять необработанный документ, взять все слова, которые не содержат гласную, и сдвинуть ее обратно.в другую базу данных.Этот шаг обработки данных блокирует.
Предполагая, что существует несколько миллионов документов, и распределенный кластер имеет только 10 узлов с 1 доступным процессом (т. Е. Он может обрабатывать только 10 документов за один раз), как будет выполняться.распределенная обработка потока документов, которые необходимо обработать?
Вот пример кода:
client = dask.distributed('tcp://1.2.3.4:8786')
def some_func():
doc = retrieve_next_document_asynchronously()
client.submit(get_vowelless_words, doc)
def get_vowelless_words(doc):
vowelless_words = process(doc)
write_to_database(vowelless_words)
if __name__ == '__main__':
for i in range(1000000):
some_func()
Поскольку обработка документа блокируется, и кластер может обрабатывать только 10 документоводновременно, что происходит, когда 30 других документов извлекаются, когда кластер занят?Я понимаю, что client.submit()
является асинхронным и возвращает параллельное будущее, но что произойдет в этом случае?Будет ли он хранить документ в памяти до тех пор, пока не будет доступно 1/10 ядер, и, возможно, приведет к тому, что у машины не хватит памяти, скажем, после того, как 1000 документов ждут.
Что бы сделал планировщик в этом случае?FIFO?Должен ли я каким-то образом изменить код, чтобы он ожидал доступности ядра, прежде чем получить следующий документ?Как это может быть достигнуто?