Многопроцессорный API-запрос - PullRequest
0 голосов
/ 03 ноября 2018

Я создаю процесс для получения данных блока переписи от правительства США. В основном это работает, но я пытаюсь добавить несколько параллельных и объединенных процессов для ускорения выполнения. Мне удалось получить некоторые результаты с кодом ниже. Однако он возвращает данные в виде списка вместо фрейма данных, и я не могу определить, как добавить пул. Я потратил много времени на поиски ответов с небольшим количеством удачи, и мой крайний срок вырисовывается. Кто-нибудь, пожалуйста, помогите?

Обратите внимание, что я не могу добавить изображение, но результаты связаны внизу.

import dataiku
from dataiku import pandasutils as pdu
import pandas as pd

from dataiku.core.sql import SQLExecutor2
import time
import requests
import csv
import datetime
from joblib import Parallel, delayed
import multiprocessing as mp

from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

e = SQLExecutor2(connection='Snowflake')

Переменные

LOOKUP_TABLE      = '_COORDS'
RESULTS_TABLE     = 'COORDINATES2'
P_BATCH_SIZE_UNIT = 5
P_ITERATION_LIMIT = 5
P_RETRY_ATTEMPTS  = 5

col_list_         = ['LATITUDE','LONGITUDE','GEO_ID','BLOCK_ID','BLOCK_GROUP_ID',
                     'TRACT_ID','COUNTY_ID','STATE_ID','COUNTY_NAME',
                     'STATE_NAME','STATE_ABBR','BBOX']
df_out            = pd.DataFrame([],columns=(col_list_))

Извлекает данные из API в виде одной строки DataFrame

def coords(lat, lon):
    url = 'https://geo.fcc.gov/api/census/block/find'
    payload = {'format': 'json','latitude':  lat,'longitude': lon,'showall': 'true'}

    i = 1
    while i < P_RETRY_ATTEMPTS:
        call = requests.get(url, params = payload, verify=False, timeout=10)
        try:
            if call.status_code == 200 :
                data = call.json()
                d={}
                d['LATITUDE']       = lat
                d['LONGITUDE']      = lon
                d['GEO_ID']         = data['Block']['FIPS']
                d['BLOCK_ID']       = data['Block']['FIPS'][13:]
                d['BLOCK_GROUP_ID'] = data['Block']['FIPS'][11:12]
                d['TRACT_ID']       = data['Block']['FIPS'][5:11]
                d['COUNTY_ID']      = data['County']['FIPS'][2:5]
                d['STATE_ID']       = data['State']['FIPS']
                d['COUNTY_NAME']    = data['County']['name']
                d['STATE_NAME']     = data['State']['name']
                d['STATE_ABBR']     = data['State']['code']
                d['BBOX']           = str(data['Block']['bbox'])


                df_out_tmp = pd.DataFrame([d],columns=(col_list_))
                return df_out_tmp
                break
            else: i += 1
        except:
            i += 1
            continue

Запустить процесс для одного элемента

def processItem(x):
    try:
        call = coords(x[0],x[1])
        #return call.to_csv(header=False)
        return call
    except:
        return False

Итерация и обработка элементов

df = e.query_to_df(SQL_EXTRACT_RECORDS)

for index, row in df.iterrows():
    df_out = df_out.append(processItem(row)).drop_duplicates()

results = Parallel(n_jobs=4, verbose=1, backend="threading")(map(delayed(processItem), df.values.tolist()))
results

Результаты вывода

...