Как ускорить вставку execute_async в Cassandra с помощью драйвера Python - PullRequest
0 голосов
/ 11 июня 2019

Я пытаюсь загрузить данные в Cassandra с помощью драйвера python.Самое быстрое, что мне удалось получить - около 6 тыс. Записей в секунду.Мой CSV, с которого я читаю, имеет около 1,15 миллиона строк, что приводит к общему времени вставки около 3 минут и 10 секунд.Мне действительно нужно сократить это время до 2 минут или меньше, чтобы не отставать от данных по мере поступления.

Мои данные состоят из 1,15 миллионов строк с 52 столбцами.

В настоящее время яиспользуя функцию session.execute_async для вставки данных.Внесение изменений в число запросов asnyc, которые я допускаю за один раз, похоже, ускоряет его.Похоже, что блокирование после примерно 5-6 тыс. Запросов приводит к самой высокой скорости вставки.

Я пытался выполнить пакетную вставку, но они были крайне медленными.

Вот мой текущий метод вставки данных вКассандра.

# insert data into cassandra table
execution_profile = ExecutionProfile(request_timeout=10)
profiles = {'node1': execution_profile}
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['11.111.11.11'], 9042, auth_provider=auth_provider, execution_profiles=profiles) 
session = cluster.connect() # connect to your keyspace

# Read csv rows into cassandra
count = 0
futures = []
with open('/massaged.csv') as f:
    next(f) #skip the header row
    for line in f:
        query = SimpleStatement("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,sfc_vis,sfc_gust,sfc_pres,sfc_hgt,sfc_tmp,sfc_snow_0Xacc,sfc_cnwat,sfc_weasd,sfc_snowc,sfc_snod,two_m_tmp,two_m_pot,two_m_spfh,two_m_dpt,two_m_rh,ten_m_ugrd,ten_m_vgrd,ten_m_wind_1hr_max,ten_m_maxuw_1hr_max,ten_m_maxvw_1hr_max,sfc_cpofp,sfc_prate,sfc_apcp_0Xacc,sfc_weasd_0Xacc,sfc_frozr_0Xacc,sfc_frzr_0Xacc,sfc_ssrun_1hr_acc,sfc_bgrun_1hr_acc,sfc_apcp_1hr_acc,sfc_weasd_1hr_acc,sfc_frozr_1hr_acc,sfc_csnow,sfc_cicep,sfc_cfrzr,sfc_crain,sfc_sfcr,sfc_fricv,sfc_shtfl,sfc_lhtfl,sfc_gflux,sfc_vgtyp,sfc_cape,sfc_cin,sfc_dswrf,sfc_dlwrf,sfc_uswrf,sfc_ulwrf,sfc_vbdsf,sfc_vddsf,sfc_hpbl) VALUES (%s)" %(line), consistency_level=ConsistencyLevel.ONE)
        futures.append(session.execute_async(query, execution_profile='node1'))
        count += 1
        if count % 5000 == 0:
            for f in futures:
                f.result() # blocks until remaining inserts are completed.
                futures = []
            print("rows processed: " + str(count))

# Catch any remaining async requests that haven't finished
for f in futures:
    f.result() # blocks until remaining inserts are completed.
print("rows processed: " + str(count))

Мне нужно сократить время вставки до 2 минут или меньше (примерно 10 тысяч вставок в секунду).Должен ли я использовать многопроцессорную обработку для достижения этой цели или я неправильно использую функцию execute_async?

UPDATE

По предложению Алекса я попытался реализовать подготовленный оператор.Это то, что я придумал, но это, кажется, значительно медленнееЕсть какие-нибудь мысли о том, что я сделал неправильно?

hrrr_prepared = session.prepare("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,...,sfc_hpbl) VALUES (?, ..., ?)")

for row in range(0, len(data)):
    futures.append(session.execute_async(hrrr_prepared, tuple(data.iloc[row])))
    count += 1
    if count % 5000 == 0:
        for f in futures:
            f.result() # blocks until remaining inserts are completed.
            futures = []
        print("rows processed: " + str(count))

ПРИМЕЧАНИЕ. Я вставил «...» в подготовленное утверждение для удобства чтения, реальный код не имеет этого.

1 Ответ

1 голос
/ 11 июня 2019

Большое ускорение должно происходить из-за использования подготовленных операторов вместо использования SimpleStatement - для подготовленного оператора он анализируется только один раз (вне цикла), а затем только данные отправляются на сервер вместе с идентификатором запроса. С SimpleStatement запрос будет анализироваться каждый раз.

Кроме того, потенциально вы можете улучшить пропускную способность, если не будете ждать завершения всех фьючерсов, но у вас будет своего рода «счетный семафор», который не позволит вам превысить максимальное количество запросов «в полете», но Вы можете отправить новый запрос, как только некоторые из них будут выполнены. Я не эксперт по Python, поэтому не могу точно сказать, как это сделать, но вы можете заглянуть в реализацию Java , чтобы понять идею.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...