Я пытался использовать execute
, execute_async
и execute_concurrent
в Cassandra, но для чтения 10M строк я мог проиндексировать их в Cassandra не менее чем за 55 минут. Обратите внимание, что я установил параллельные потоки равными 1000, а также настроил ограничения одновременного чтения и записи в файле YAML (до 10000). Я пробовал фактор репликации 0, 1, 2 при создании кластера. Никто не сможет проиндексировать файл за меньшее время. Итак, я решил, что вместо последовательного чтения csv, добавления его в список и последующей записи в Cassandra в пакетном, параллельном или асинхронном режимах, как насчет параллельного чтения CSV ?! Поэтому я использовал dask для чтения csv-файла из 10M строк.
import json
import logging
from datetime import datetime
import dask.dataframe as dd
import dask.multiprocessing
import sys
import json
import pandas as pd
from cassandra import ConsistencyLevel, WriteTimeout
from cassandra.cluster import BatchStatement, Cluster
from cassandra.query import SimpleStatement
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
class PythonCassandraExample:
def __init__(self, version):
self.cluster = None
self.session = None
self.keyspace = None
self.log = None
self.version = version
def __del__(self):
self.cluster.shutdown()
def createsession(self):
self.cluster = Cluster(['localhost'], connect_timeout=50)
self.session = self.cluster.connect(self.keyspace)
def getsession(self):
return self.session
# How about Adding some log info to see what went wrong
def setlogger(self):
log = logging.getLogger()
log.setLevel('INFO')
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
log.addHandler(handler)
self.log = log
# Create Keyspace based on Given Name
def handle_error(self, exception):
self.log.error("Failed to fetch user info: %s", exception)
def createkeyspace(self, keyspace):
"""
:param keyspace: The Name of Keyspace to be created
:return:
"""
# Before we create new lets check if exiting keyspace; we will drop that and create new
rows = self.session.execute(
"SELECT keyspace_name FROM system_schema.keyspaces")
if keyspace in [row[0] for row in rows]:
self.log.info("dropping existing keyspace...")
self.session.execute("DROP KEYSPACE " + keyspace)
self.log.info("creating keyspace...")
self.session.execute("""
CREATE KEYSPACE %s
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '3' }
""" % keyspace)
self.log.info("setting keyspace...")
self.session.set_keyspace(keyspace)
def create_table(self, table_name):
self.table_name = table_name
c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version varchar, row varchar, PRIMARY KEY(id, version));".format(
self.table_name)
print("Query for creating table is: {}".format(c_sql))
self.session.execute(c_sql)
self.log.info("DP Table Created !!!")
self.insert_sql = self.session.prepare(
(
"INSERT INTO {} ({}, {}, {}) VALUES (?,?,?)"
).format(
self.table_name, "id", "version", "row"
)
)
# lets do some batch insert
def insert_data(self, key, version, row):
self.session.execute(
self.insert_sql, [key, version, row]
)
@dask.delayed
def print_a_block(self, d):
d = d.to_dict(orient='records')
for row in d:
key = str(row["0"])
row = json.dumps(row, default=str)
self.insert_data(key, self.version, row)
if __name__ == '__main__':
start_time = datetime.utcnow()
example1 = PythonCassandraExample(version="version_1")
example1.createsession()
example1.setlogger()
example1.createkeyspace('fri_athena_two')
example1.create_table('TenMillion')
example1.log.info("Calling compute!")
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv")
dask.compute(*[example1.print_a_block(d) for d in df.to_delayed()])
print(datetime.utcnow() - start_time)
Даже используя dask, все усилия были напрасны, поскольку прошел час, и, тем не менее, задача записи строк в Cassandra еще не была выполнена? Что еще я должен сделать, чтобы уменьшить время, потраченное?