использование dask для отправки параллельного запроса API и обработки ошибок - PullRequest
0 голосов
/ 02 октября 2018

Я недавно начал использовать dask.Я хочу отправить данные в REST API с помощью http-запроса, API возвращает файл json, чтобы проверить, успешна ли загрузка данных.Вот моя функция вызова API:

def requestToAPI():
    headers={'Content-Type': 'application/json'}
    data = {
      "api_key" : "xxxxxxxxxxxxx",
      "attributes" : [
       {
         "external_id" : "user1",
         "app_id" : "xxxx-xxx-xxxxx-xxxx",
         "firs_name" : "user_firstname",
         "last_name" : "user_lastname_test"
       }
     ]
    }
    r = requests.post('https://abcdf.com/users/abdcgdu', headers=headers, data=json.dumps(data))
    return r.json()

У меня есть несколько блоков данных dask, которые я получаю из кода ниже:

 rChunk=dd.from_pandas(pandaDataFrame, chunksize=1000)

Как я могу использовать dask и использовать выше блоки (Предположим, что каждый кусок будет изменен на соответствующий файл json) для отправки параллельного запроса в API и выполнения правильной обработки ошибок, если один из запросов завершился ошибкой / ошибкой возврата?

Я пытался использовать dask.delayed:

[отложено (requestToAPI) (чанк) для чанка в rChunk]

но не уверен, как я могу сделать правильную обработку ошибок ??

1 Ответ

0 голосов
/ 08 октября 2018

Я не уверен, что dask dataframe - лучший выбор для вашего приложения.Возможно, вы захотите взглянуть на отложенные, фьючерсные или пакетные API.

Я бы, вероятно, использовал concurrent.futures

from dask.distributed import Client, as_completed

futures = client.map(process, requests)
for future in as_completed(futures):
    try:
        response = future.result()
        # do stuff with result
    except Exeption:
        # do stuff

http://docs.dask.org/en/latest/futures.html

...