У меня очень большой набор данных (71M строк - ~ 7 ГБ CSV-файла), который я загрузил в кадр данных в pandas.
Мне нужно отправить каждую строку в кадре данных в сообщениизапросите API, а затем сохраните ответ в другом фрейме данных, который экспортируется позже и проанализирован
В настоящее время мой код выглядит следующим образом
##read data from csv
for row in data.itertuples(index=True, name='Pandas'):
##There is an if-else ladder to create a header depending on the type of values in a row
##Code to create a json payload
r = requests.post(url, data=json.dumps(payload), headers=headers)
t = json.loads(r.text)
## A try and except block to add the data sent via header and payload + the response from API call back into a new data-frame. (exception is in case there is no response from the API)
##write the data back to csv
API может обрабатывать QPS (запросов в секунду) около 50 000 с лишним, однако этот метод выполнения просто достигает примерно 11 QPS.В моем предыдущем тесте я разбил небольшой набор данных (около 7 миллионов строк) 4 раза и запустил 4 разных ноутбука Jupyter с одним и тем же кодом, что позволило эффективно достичь QPS около 44-50 и запустить код в течение 24 часов.
Примечание : я не хочу поражать его таким большим количеством QPS, поскольку это производственный API, мне дали скидку около 10k QPS
Поскольку теперь мне нужно запустить его на гораздо большем множестве, есть ли способ сделать это в Python?Разве разделение данных на 4 части и их совместное выполнение эквивалентно параллельной обработке кода?
Может быть, я все делаю неправильно, и есть какой-то другой путь?- мой опыт работы с Python в основном связан с аналитикой и наукой о данных (numpy, pandas и т. д.), так что это был единственный подход, который мне пришел в голову.
Я использую систему с ксеноном Intel e5-2690Процессор v2 (20 ядер) и 128 ГБ ОЗУ, так что я думаю, что он должен быть в состоянии справиться с этим, поскольку в моем предыдущем исполнении он с трудом справлялся с использованием ресурсов.
Буду признателен за любую помощь, чтобы направить меня в правильном направлении.
РЕДАКТИРОВАТЬ: Все предложения указали мне на aiohttp, однако, потому что у меня не хватало времени,и я добился успеха с пулом многопроцессорности, я пошел вперед с этим.Добавлено несколько дополнительных строк кода
if __name__=='__main__':
##read data
data_split = np.array_split(data,20)
p = Pool(20)
p.map(apicall, data_split)
Функция ' apicall ' по сути такая же, как и в приведенном выше фрагменте кода (раздел цикла for)
Работаетштрафа около 10000 данных.Однако, если я масштабирую его до 100 000 или более, я получаю сообщение об ошибке и снова застреваю
OSError: [WinError 10048] Только одно использование каждого адреса сокета (протокол / сетевой адрес / порт)обычно разрешено
Edit2: Понял, почему я получил вышеупомянутую ошибку.Похоже, асинхронный - единственный путь.