Я пытаюсь загрузить данные в Cassandra с помощью драйвера python.Самое быстрое, что мне удалось получить - около 6 тыс. Записей в секунду.Мой CSV, с которого я читаю, имеет около 1,15 миллиона строк, что приводит к общему времени вставки около 3 минут и 10 секунд.Мне действительно нужно сократить это время до 2 минут или меньше, чтобы не отставать от данных по мере поступления.
Мои данные состоят из 1,15 миллионов строк с 52 столбцами.
В настоящее время яиспользуя функцию session.execute_async для вставки данных.Внесение изменений в число запросов asnyc, которые я допускаю за один раз, похоже, ускоряет его.Похоже, что блокирование после примерно 5-6 тыс. Запросов приводит к самой высокой скорости вставки.
Я пытался выполнить пакетную вставку, но они были крайне медленными.
Вот мой текущий метод вставки данных вКассандра.
# insert data into cassandra table
execution_profile = ExecutionProfile(request_timeout=10)
profiles = {'node1': execution_profile}
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['11.111.11.11'], 9042, auth_provider=auth_provider, execution_profiles=profiles)
session = cluster.connect() # connect to your keyspace
# Read csv rows into cassandra
count = 0
futures = []
with open('/massaged.csv') as f:
next(f) #skip the header row
for line in f:
query = SimpleStatement("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,sfc_vis,sfc_gust,sfc_pres,sfc_hgt,sfc_tmp,sfc_snow_0Xacc,sfc_cnwat,sfc_weasd,sfc_snowc,sfc_snod,two_m_tmp,two_m_pot,two_m_spfh,two_m_dpt,two_m_rh,ten_m_ugrd,ten_m_vgrd,ten_m_wind_1hr_max,ten_m_maxuw_1hr_max,ten_m_maxvw_1hr_max,sfc_cpofp,sfc_prate,sfc_apcp_0Xacc,sfc_weasd_0Xacc,sfc_frozr_0Xacc,sfc_frzr_0Xacc,sfc_ssrun_1hr_acc,sfc_bgrun_1hr_acc,sfc_apcp_1hr_acc,sfc_weasd_1hr_acc,sfc_frozr_1hr_acc,sfc_csnow,sfc_cicep,sfc_cfrzr,sfc_crain,sfc_sfcr,sfc_fricv,sfc_shtfl,sfc_lhtfl,sfc_gflux,sfc_vgtyp,sfc_cape,sfc_cin,sfc_dswrf,sfc_dlwrf,sfc_uswrf,sfc_ulwrf,sfc_vbdsf,sfc_vddsf,sfc_hpbl) VALUES (%s)" %(line), consistency_level=ConsistencyLevel.ONE)
futures.append(session.execute_async(query, execution_profile='node1'))
count += 1
if count % 5000 == 0:
for f in futures:
f.result() # blocks until remaining inserts are completed.
futures = []
print("rows processed: " + str(count))
# Catch any remaining async requests that haven't finished
for f in futures:
f.result() # blocks until remaining inserts are completed.
print("rows processed: " + str(count))
Мне нужно сократить время вставки до 2 минут или меньше (примерно 10 тысяч вставок в секунду).Должен ли я использовать многопроцессорную обработку для достижения этой цели или я неправильно использую функцию execute_async?
UPDATE
По предложению Алекса я попытался реализовать подготовленный оператор.Это то, что я придумал, но это, кажется, значительно медленнееЕсть какие-нибудь мысли о том, что я сделал неправильно?
hrrr_prepared = session.prepare("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,...,sfc_hpbl) VALUES (?, ..., ?)")
for row in range(0, len(data)):
futures.append(session.execute_async(hrrr_prepared, tuple(data.iloc[row])))
count += 1
if count % 5000 == 0:
for f in futures:
f.result() # blocks until remaining inserts are completed.
futures = []
print("rows processed: " + str(count))
ПРИМЕЧАНИЕ. Я вставил «...» в подготовленное утверждение для удобства чтения, реальный код не имеет этого.