Dask: невозможно отправить задачи глобальному клиенту из отдельной многопроцессорной обработки. Процесс - PullRequest
1 голос
/ 09 января 2020

У меня есть 2 процесса: первый, на котором я создаю глобальный распределенный клиент; второй процесс - это веб-скребок, который должен получить глобального клиента и отправить ему задачи, а когда все будет сделано, он отправит сообщение другому процессу, чтобы сообщить ему, что он может продолжить.

from dask.distributed import Client, as_completed
from multiprocessing import Process
from time import sleep
import zmq


def get(url) -> dict:
   # downloads data from url
   time.sleep(3)

   return data


def save(data) -> None:
    # saves data locally
    time.sleep(3)

    return None


def scraper(urls):
    # global client
    client = get_client()

    # zeromq socket
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind('tcp://*:port')

    while True:
        for future, result in as_completed([client.submit(get, url=url) for url in urls], with_results=True):
        save(data=result)
        socket.send_string('All job is done for this minute, proceed.')
        sleep(60)


if __name__ == '__main__':

    client = Client()

    s = Process(target=scraper, *args, **kwargs)
    s.start()

проблема в том, что из функции скребка я могу получить глобальный клиент (я вижу его правильно, если я его распечатаю), но я не могу представить ему какую-либо задачу. Консоль не печатает никаких ошибок, она просто зависает, ничего не делая. Я думаю, что причина в том, что функция скребка выполняется на многопроцессорной обработке saparate. Процесс.

Любое решение или обходной путь? Спасибо.

1 Ответ

0 голосов
/ 10 января 2020

Клиент dask хранит открытые соединения с планировщиком. В зависимости от того, как в вашей системе создаются новые процессы, вы можете получить копии соединений, которые не указывают на что-либо полезное в новом процессе, или не смогут полностью передать клиента (это невозможно).

Вместо этого вам следует отправьте информацию о соединении дочернему процессу

addr = c.scheduler_info()['address']

и в целевой функции выполните

client = Client(addr)
...