Я написал скрипт на Python, который читает строки из файла CSV и затем вставляет его в Cassandra. Все работает нормально, но после определенных вставок получает ошибку тайм-аута.
# lets do some batch insert
def insert_data(self):
start_time = datetime.utcnow()
destination = "/Users/aviralsrivastava/dev/learning_dask/10M_rows.csv"
chunksize = 1000
chunks = pd.read_csv(destination, chunksize=chunksize)
chunk_counter = 0
for df in chunks:
df = df.to_dict(orient='records')
chunk_counter += 1
batch = BatchStatement()
for row in df:
key = str(row["0"])
row = json.dumps(row, default=str)
insert_sql = self.session.prepare(
(
"INSERT INTO {} ({}, {}, {}) VALUES (?,?,?)"
).format(
self.table_name, "id", "version", "row"
)
)
batch.add(insert_sql, (key, "version_1", row))
self.session.execute(batch)
self.log.info("One chunk's Batch Insert Completed")
print(
str(chunk_counter*chunksize) + " : " +
str(datetime.utcnow() - start_time)
)
del batch
print("Complete task's duration is: {}".format(
datetime.utcnow() - start_time))
Код для установления соединения:
def createsession(self):
self.cluster = Cluster(['localhost'], connect_timeout = 50)
self.session = self.cluster.connect(self.keyspace)
И ошибка:
2019-01-16 15:58:49,013 [ERROR] cassandra.cluster: Error preparing query:
Traceback (most recent call last):
File "cassandra/cluster.py", line 2402, in cassandra.cluster.Session.prepare
File "cassandra/cluster.py", line 4062, in cassandra.cluster.ResponseFuture.result
cassandra.OperationTimedOut: errors={'127.0.0.1': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1
Traceback (most recent call last):
File "getting_started.py", line 107, in <module>
example1.insert_data()
File "getting_started.py", line 86, in insert_data
self.table_name, "id", "version", "row"
File "cassandra/cluster.py", line 2405, in cassandra.cluster.Session.prepare
File "cassandra/cluster.py", line 2402, in cassandra.cluster.Session.prepare
File "cassandra/cluster.py", line 4062, in cassandra.cluster.ResponseFuture.result
cassandra.OperationTimedOut: errors={'127.0.0.1': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1