Как читать CSV параллельно и писать в Cassandra параллельно для достижения высокой пропускной способности? - PullRequest
0 голосов
/ 17 января 2019

Я пытался использовать execute, execute_async и execute_concurrent в Cassandra, но для чтения 10M строк я мог проиндексировать их в Cassandra не менее чем за 55 минут. Обратите внимание, что я установил параллельные потоки равными 1000, а также настроил ограничения одновременного чтения и записи в файле YAML (до 10000). Я пробовал фактор репликации 0, 1, 2 при создании кластера. Никто не сможет проиндексировать файл за меньшее время. Итак, я решил, что вместо последовательного чтения csv, добавления его в список и последующей записи в Cassandra в пакетном, параллельном или асинхронном режимах, как насчет параллельного чтения CSV ?! Поэтому я использовал dask для чтения csv-файла из 10M строк.

import json
import logging
from datetime import datetime
import dask.dataframe as dd
import dask.multiprocessing
import sys
import json

import pandas as pd
from cassandra import ConsistencyLevel, WriteTimeout
from cassandra.cluster import BatchStatement, Cluster
from cassandra.query import SimpleStatement
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args


class PythonCassandraExample:
    def __init__(self, version):
        self.cluster = None
        self.session = None
        self.keyspace = None
        self.log = None
        self.version = version

    def __del__(self):
        self.cluster.shutdown()

    def createsession(self):
        self.cluster = Cluster(['localhost'], connect_timeout=50)
        self.session = self.cluster.connect(self.keyspace)

    def getsession(self):
        return self.session

    # How about Adding some log info to see what went wrong
    def setlogger(self):
        log = logging.getLogger()
        log.setLevel('INFO')
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(
            "%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
        log.addHandler(handler)
        self.log = log
    # Create Keyspace based on Given Name

    def handle_error(self, exception):
        self.log.error("Failed to fetch user info: %s", exception)


    def createkeyspace(self, keyspace):
        """
        :param keyspace:  The Name of Keyspace to be created
        :return:
        """
        # Before we create new lets check if exiting keyspace; we will drop that and create new
        rows = self.session.execute(
            "SELECT keyspace_name FROM system_schema.keyspaces")
        if keyspace in [row[0] for row in rows]:
            self.log.info("dropping existing keyspace...")
            self.session.execute("DROP KEYSPACE " + keyspace)

        self.log.info("creating keyspace...")
        self.session.execute("""
                CREATE KEYSPACE %s
                WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '3' }
                """ % keyspace)

        self.log.info("setting keyspace...")
        self.session.set_keyspace(keyspace)

    def create_table(self, table_name):
        self.table_name = table_name
        c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version varchar, row varchar, PRIMARY KEY(id, version));".format(
            self.table_name)
        print("Query for creating table is: {}".format(c_sql))
        self.session.execute(c_sql)
        self.log.info("DP Table Created !!!")
        self.insert_sql = self.session.prepare(
            (
                "INSERT INTO  {} ({}, {}, {}) VALUES (?,?,?)"
            ).format(
                self.table_name, "id", "version", "row"
            )
        )

    # lets do some batch insert
    def insert_data(self, key, version, row):
        self.session.execute(
            self.insert_sql, [key, version, row]
        )
    @dask.delayed
    def print_a_block(self, d):
        d = d.to_dict(orient='records')
        for row in d:
            key = str(row["0"])
            row = json.dumps(row, default=str)
            self.insert_data(key, self.version, row)

if __name__ == '__main__':
    start_time = datetime.utcnow()
    example1 = PythonCassandraExample(version="version_1")
    example1.createsession()
    example1.setlogger()
    example1.createkeyspace('fri_athena_two')
    example1.create_table('TenMillion')
    example1.log.info("Calling compute!")
    df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv")
    dask.compute(*[example1.print_a_block(d) for d in df.to_delayed()])
    print(datetime.utcnow() - start_time)

Даже используя dask, все усилия были напрасны, поскольку прошел час, и, тем не менее, задача записи строк в Cassandra еще не была выполнена? Что еще я должен сделать, чтобы уменьшить время, потраченное?

...