Я создаю процесс для получения данных блока переписи от правительства США. В основном это работает, но я пытаюсь добавить несколько параллельных и объединенных процессов для ускорения выполнения. Мне удалось получить некоторые результаты с кодом ниже. Однако он возвращает данные в виде списка вместо фрейма данных, и я не могу определить, как добавить пул. Я потратил много времени на поиски ответов с небольшим количеством удачи, и мой крайний срок вырисовывается. Кто-нибудь, пожалуйста, помогите?
Обратите внимание, что я не могу добавить изображение, но результаты связаны внизу.
import dataiku
from dataiku import pandasutils as pdu
import pandas as pd
from dataiku.core.sql import SQLExecutor2
import time
import requests
import csv
import datetime
from joblib import Parallel, delayed
import multiprocessing as mp
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
e = SQLExecutor2(connection='Snowflake')
Переменные
LOOKUP_TABLE = '_COORDS'
RESULTS_TABLE = 'COORDINATES2'
P_BATCH_SIZE_UNIT = 5
P_ITERATION_LIMIT = 5
P_RETRY_ATTEMPTS = 5
col_list_ = ['LATITUDE','LONGITUDE','GEO_ID','BLOCK_ID','BLOCK_GROUP_ID',
'TRACT_ID','COUNTY_ID','STATE_ID','COUNTY_NAME',
'STATE_NAME','STATE_ABBR','BBOX']
df_out = pd.DataFrame([],columns=(col_list_))
Извлекает данные из API в виде одной строки DataFrame
def coords(lat, lon):
url = 'https://geo.fcc.gov/api/census/block/find'
payload = {'format': 'json','latitude': lat,'longitude': lon,'showall': 'true'}
i = 1
while i < P_RETRY_ATTEMPTS:
call = requests.get(url, params = payload, verify=False, timeout=10)
try:
if call.status_code == 200 :
data = call.json()
d={}
d['LATITUDE'] = lat
d['LONGITUDE'] = lon
d['GEO_ID'] = data['Block']['FIPS']
d['BLOCK_ID'] = data['Block']['FIPS'][13:]
d['BLOCK_GROUP_ID'] = data['Block']['FIPS'][11:12]
d['TRACT_ID'] = data['Block']['FIPS'][5:11]
d['COUNTY_ID'] = data['County']['FIPS'][2:5]
d['STATE_ID'] = data['State']['FIPS']
d['COUNTY_NAME'] = data['County']['name']
d['STATE_NAME'] = data['State']['name']
d['STATE_ABBR'] = data['State']['code']
d['BBOX'] = str(data['Block']['bbox'])
df_out_tmp = pd.DataFrame([d],columns=(col_list_))
return df_out_tmp
break
else: i += 1
except:
i += 1
continue
Запустить процесс для одного элемента
def processItem(x):
try:
call = coords(x[0],x[1])
#return call.to_csv(header=False)
return call
except:
return False
Итерация и обработка элементов
df = e.query_to_df(SQL_EXTRACT_RECORDS)
for index, row in df.iterrows():
df_out = df_out.append(processItem(row)).drop_duplicates()
results = Parallel(n_jobs=4, verbose=1, backend="threading")(map(delayed(processItem), df.values.tolist()))
results
Результаты вывода