Вычисление блоков массива dask асинхронно (Dask + FastAPI) - PullRequest
1 голос
/ 02 марта 2020

Я создаю приложение FastAPI, которое будет обслуживать блоки массива Dask. Я хотел бы использовать Асинхронную функциональность FastAPI наряду с Способностью распределенной Dask работать асинхронно . Ниже приведен mcve, демонстрирующий то, что я пытаюсь сделать на стороне сервера и на стороне приложения:

На стороне сервера:

import time

import dask.array as da
import numpy as np
import uvicorn
from dask.distributed import Client
from fastapi import FastAPI

app = FastAPI()
# create a dask array that we can serve
data = da.from_array(np.arange(0, 1e6, dtype=np.int), chunks=100)


async def _get_block(block_id):
    """return one block of the dask array as a list"""
    block_data = data.blocks[block_id].compute()
    return block_data.tolist()


@app.get("/")
async def get_root():
    time.sleep(1)
    return {"Hello": "World"}


@app.get("/{block_id}")
async def get_block(block_id: int):
    time.sleep(1)  # so we can test concurrency
    my_list = await _get_block(block_id)
    return {"block": my_list}


if __name__ == "__main__":
    client = Client(n_workers=2)
    print(client)
    print(client.cluster.dashboard_link)
    uvicorn.run(app, host="0.0.0.0", port=9000, log_level="debug")

Клиентская сторона

import dask
import requests
from dask.distributed import Client

client = Client()

responses = [
    dask.delayed(requests.get, pure=False)(f"http://127.0.0.1:9000/{i}") for i in range(10)
]
dask.compute(responses)

В этой настройке вызов compute() в _get_block является «блокированием», и одновременно вычисляется только один фрагмент. Я пробовал различные комбинации Client(asynchronous=True) и client.compute(dask.compute(responses)) без каких-либо улучшений. Возможно ли await вычисление массива dask?

1 Ответ

0 голосов
/ 02 марта 2020

Эта строка

block_data = data.blocks[block_id].compute()

является блокирующим вызовом. Если бы вы вместо этого сделали client.compute(data.blocks[block_id]), вы получите ожидаемое будущее, которое можно будет использовать вместе с вами IOL oop, при условии, что Dask использует тот же l oop.

Обратите внимание, что Intake Сервер очень хотел бы работать таким образом (он также стремится передавать данные по частям для массивов и других типов данных).

...