Как мне достичь пропускной способности 50k / sec при вставке моих данных в Cassandra при чтении ввода из файла csv? - PullRequest
0 голосов
/ 23 января 2019

Моя цель - увеличить пропускную способность управления версиями данных в Кассандре.Я использовал одновременное чтение и запись, а также увеличил размер фрагмента, который мой код читает из файла.Моя машина - 16 ГБ с 8 ядрами, и да, я изменил файл yaml Кассандры для 10k одновременных операций чтения и записи, и, когда это произошло, я обнаружил, что 10000 операций записи / чтения занимает меньше секунды.Мой минимальный, жизнеспособный код:

import json
import logging
import os
import sys
from datetime import datetime
from hashlib import sha256, sha512, sha1

import pandas as pd
from cassandra import ConsistencyLevel, WriteTimeout
from cassandra.cluster import (EXEC_PROFILE_DEFAULT, BatchStatement, Cluster,
                               ExecutionProfile)
from cassandra.concurrent import (execute_concurrent,
                                  execute_concurrent_with_args)
from cassandra.query import SimpleStatement, dict_factory


class PythonCassandraExample:
    def __init__(self, file_to_be_versioned, working_dir=os.getcwd(), mode='append'):
        self.cluster = None
        self.session = None
        self.keyspace = None
        self.log = None
        self.mode = mode
        self.file_to_be_versioned = file_to_be_versioned
        self.insert_patch = []
        self.delete_patch = []
        self.update_patch = []
        self.working_dir = working_dir

    def __del__(self):
        self.cluster.shutdown()

    def createsession(self):
        profile = ExecutionProfile(
            row_factory=dict_factory,
            request_timeout=6000
        )
        self.cluster = Cluster(
            ['localhost'],
            connect_timeout=50,
            execution_profiles={
                EXEC_PROFILE_DEFAULT: profile
            }
        )
        self.session = self.cluster.connect(self.keyspace)

    def getsession(self):
        return self.session

    # How about Adding some log info to see what went wrong
    def setlogger(self):
        log = logging.getLogger()
        log.setLevel('INFO')
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(
            "%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
        log.addHandler(handler)
        self.log = log
    # Create Keyspace based on Given Name

    def handle_error(self, exception):
        self.log.error("Failed to fetch user info: %s", exception)

    def createkeyspace(self, keyspace):
        """
        :param keyspace:  The Name of Keyspace to be created
        :return:
        """
        # Before we create new lets check if exiting keyspace; we will drop that and create new
        self.log.info("creating keyspace...")
        self.session.execute("""
                CREATE KEYSPACE IF NOT EXISTS %s
                WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }
                """ % keyspace)
        self.log.info("setting keyspace...")
        self.keyspace = keyspace
        self.session.set_keyspace(self.keyspace)

    def create_table_and_set_version(self, table_name):
        self.table_name = table_name.lower()
        table_select_query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name='{}' AND table_name='{}'".format(
            self.keyspace, self.table_name)
        print(table_select_query)
        table_exists = self.session.execute(table_select_query).one()
        self.log.info("Table exists: {}".format(table_exists))
        if table_exists:
            self.log.info(
                "Datapackage already exists! Checking the last version")
            self.version = self.session.execute(
                "SELECT version FROM {} LIMIT 1".format(self.table_name)).one()
            self.log.info(
                "The version fetched is: {} of type: {}".format(
                    self.version, type(self.version)
                )
            )
            if not self.version:
                self.version = 0
            else:
                self.version = self.version['version']
        else:
            self.log.info("Table didn't exist!")
            self.version = 0
        self.target_version = int(str(self.version)) + 1
        self.log.info(
            "Current and candidate versions are: {}, {}".format(
                self.version,
                self.target_version
            )
        )
        # c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version int, row varchar, row_hash varchar, PRIMARY KEY(id, version)) with clustering order by (version desc)".format(
        #     self.table_name)
        c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version int, row_hash varchar, PRIMARY KEY(version, id))".format(
            self.table_name
        )
        self.session.execute(c_sql)
        self.log.info("DP Table Created !!!")
        self.log.info("Current and candidate versions are: {}, {}".format(
            self.version, self.target_version))

    def push_to_update_patch(self, update_patch_file, last_patch=False):
        if len(self.update_patch) >= 10000:
            with open(update_patch_file, mode='a') as json_file:
                json_file.writelines(
                    self.update_patch
                )
            del self.update_patch[:]
        if last_patch is True and len(self.update_patch) > 0:
            with open(update_patch_file, mode='a') as json_file:
                json_file.writelines(
                    self.update_patch
                )
            del self.update_patch[:]

    def push_to_insert_patch(self, insert_patch_file, last_patch=False):
        if len(self.insert_patch) >= 10000:
            with open(insert_patch_file, mode='a') as json_file:
                json_file.writelines(
                    self.insert_patch
                )
            del self.insert_patch[:]
        if last_patch is True and len(self.update_patch) > 0:
            with open(insert_patch_file, mode='a') as json_file:
                json_file.writelines(
                    self.insert_patch
                )
            del self.insert_patch[:]

    def push_to_delete_patch(self, delete_patch_file, last_patch=False):
        if len(self.delete_patch) >= 10000:
            with open(delete_patch_file, mode='a') as json_file:
                json_file.writelines(
                    self.delete_patch
                )
            del self.delete_patch[:]
        if last_patch is True and len(self.delete_patch) > 0:
            with open(delete_patch_file, mode='a') as json_file:
                json_file.writelines(
                    self.delete_patch
                )
            del self.delete_patch[:]

    def push_to_patch(self, key, value, mode='update'):
        return
        if key is None or value is None:
            raise ValueError(
                "Key or value or both not specified for making a patch. Exiting now."
            )
        data = {}
        data["id"] = str(key)
        data["data"] = json.dumps(value, default=str)
        # convert dict to json str so that the patch is a list of line jsons.
        data = json.dumps(data, default=str)
        json_patch_file = os.path.join(
            self.working_dir,
            "version_{}_{}.json".format(
                self.target_version, mode
            )
        )
        if mode == 'update':
            self.update_patch.append(
                data + "\n"
            )
            self.push_to_update_patch(
                json_patch_file
            )
        if mode == 'insert':
            self.insert_patch.append(
                data + "\n"
            )
            self.push_to_insert_patch(
                json_patch_file
            )
        if mode == 'delete':
            self.delete_patch.append(
                data + "\n"
            )
            self.push_to_delete_patch(
                json_patch_file
            )

    def clone_version(self):
        if self.mode == 'replace':
            return
        self.log.info("Cloning version")
        start_time = datetime.utcnow()
        if self.version == 0:
            return
        insert_sql = self.session.prepare(
            (
                "INSERT INTO  {} ({}, {}, {}) VALUES (?,?,?)"
            ).format(
                self.table_name, "id", "version", "row_hash"
            )
        )
        futures = []
        current_version_query = "SELECT id, row_hash FROM {} WHERE version={}".format(
            self.table_name, self.version
        )
        current_version_rows = self.session.execute(
            current_version_query
        )
        for current_version_row in current_version_rows:
            params = (
                current_version_row['id'],
                self.target_version,
                current_version_row['row_hash']
            )
            futures.append(
                (
                    insert_sql,
                    params
                )
            )
        self.log.info(
            "Time taken to clone the version is: {}".format(
                datetime.utcnow() - start_time
            )
        )

    def hash_string(self, value):
        return (sha1(str(value).encode('utf-8')).hexdigest())

    def hash_row(self, row):
        row_json = json.dumps(row, default=str)
        return (self.hash_string(row_json))

    def insert_data(self, generate_diff=False):
        self.generate_diff = generate_diff
        destination = self.file_to_be_versioned
        chunksize = 100000
        concurrency_value = 1000
        patch_length_for_cql = chunksize
        chunks = pd.read_csv(destination, chunksize=chunksize)
        chunk_counter = 0
        insert_sql = self.session.prepare(
            (
                "INSERT INTO  {} ({}, {}, {}) VALUES (?,?,?)"
            ).format(
                self.table_name, "id", "version", "row_hash"
            )
        )
        select_sql = self.session.prepare(
            (
                "SELECT id, version, row_hash FROM {} WHERE  version=? AND id=?"
            ).format(
                self.table_name
            )
        )
        futures = []
        check_for_patch = [] #this list comprises rows with ids and values for checking whether its an update/insert
        rows_for_checking_patch = []
        start_time = datetime.utcnow()
        for df in chunks:
            rows_for_checking_patch = df.values.tolist()
            chunk_counter += 1
            df["row_hash"] = df.apply(
                self.hash_row
            )
            df["key"] = df["column_test_3"].apply(
                self.hash_string
            )
            keys = list(df["key"])
            row_hashes = list(df["row_hash"])
            start_time_de_params = datetime.utcnow()
            for i in range(chunksize):
                row_check = None
                params = (
                    str(keys[i]),
                    self.target_version, 
                    str(row_hashes[i])
                )
                check_for_patch_params = (
                    self.version,
                    str(keys[i])
                )
                check_for_patch.append(
                    (
                        select_sql,
                        check_for_patch_params
                    )
                )
                futures.append(
                    (
                        insert_sql,
                        params
                    )
                )
            self.log.info("Time for params: {}".format(datetime.utcnow() - start_time_de_params))
            if len(check_for_patch) >= patch_length_for_cql:
                start_time_de_update = datetime.utcnow()
                results = execute_concurrent(
                    self.session, check_for_patch, concurrency=concurrency_value, raise_on_first_error=False
                )
                self.log.info("Time for just the query: {}".format(datetime.utcnow() - start_time_de_update))
                row_counter_for_patch = 0
                for (success, result) in results:
                    if not result:
                        self.push_to_patch(
                            keys[row_counter_for_patch],
                            rows_for_checking_patch[row_counter_for_patch],
                            mode='insert'
                        )
                        row_counter_for_patch += 1
                        continue
                    if not success:
                        # result will be an Exception
                        self.log.error("Error has occurred in insert cql")
                        self.handle_error(result)
                    id_to_be_compared = result[0]["id"]
                    row_hash_to_be_compared = result[0]["row_hash"]
                    if (row_hash_to_be_compared != row_hashes[row_counter_for_patch]):
                        self.push_to_patch(
                            id_to_be_compared,
                            rows_for_checking_patch[row_counter_for_patch]["row"],
                            mode='update'
                        )
                    row_counter_for_patch += 1
                del check_for_patch[:]
                del rows_for_checking_patch[:]
                row_counter_for_patch = 0
                self.log.info("Time for check patch: {}".format(
                    datetime.utcnow() - start_time_de_update
                ))

            if (len(futures) >= patch_length_for_cql):
                start_time_de_insert = datetime.utcnow()
                results = execute_concurrent(
                    self.session, futures, concurrency=concurrency_value, raise_on_first_error=False
                )
                for (success, result) in results:
                    if not success:
                        # result will be an Exception
                        self.log.error("Error has occurred in insert cql")
                        self.handle_error(result)
                del futures[:]
                self.log.info("Time for insert patch: {}".format(
                    datetime.utcnow() - start_time_de_insert
                    ))
            self.log.info(chunk_counter)
            # self.log.info("This chunk got over in {}".format(datetime.utcnow() - start_time))

        if len(check_for_patch) > 0:
            results = execute_concurrent(
                self.session, check_for_patch, concurrency=concurrency_value, raise_on_first_error=False
            )
            row_counter_for_patch = 0
            for (success, result) in results:
                if not result:
                    self.push_to_patch(
                        rows_for_checking_patch[row_counter_for_patch]["id"],
                        rows_for_checking_patch[row_counter_for_patch]["row"],
                        mode='insert'
                    )
                    row_counter_for_patch += 1
                    continue
                if not success:
                    # result will be an Exception
                    self.log.error("Error has occurred in insert cql")
                    self.handle_error(result)
                id_to_be_compared = result[0]["id"]
                row_hash_to_be_compared = result[0]["row_hash"]
                if (row_hash_to_be_compared != rows_for_checking_patch[row_counter_for_patch]["row_hash"]):
                    self.push_to_patch(
                        id_to_be_compared,
                        rows_for_checking_patch[row_counter_for_patch]["row"],
                        mode='update'
                    )
                    row_counter_for_patch += 1
            del check_for_patch[:]
            del rows_for_checking_patch[:]

        if len(futures) > 0:   # in case the last dataframe has #rows < 10k.
            results = execute_concurrent(
                self.session, futures, concurrency=concurrency_value, raise_on_first_error=False
            )
            for (success, result) in results:
                if not success:
                    self.handle_error(result)
            del futures[:]
            self.log.info(chunk_counter)

        # Check the delete patch
        if self.generate_diff is True and self.mode is 'replace' and self.version is not 0:
            self.log.info("We got to find the delete patch!")
            start_time = datetime.utcnow()
            current_version_query = "SELECT id, row, row_hash FROM {} WHERE version={}".format(
                self.table_name, self.version
            )
            current_version_rows = self.session.execute(
                current_version_query
            )
            for current_version_row in current_version_rows:
                row_check_query = "SELECT {} FROM {} WHERE {}={} AND {}='{}' ".format(
                    "id", self.table_name, "version", self.target_version, "id", current_version_row.id
                )
                row_check = self.session.execute(row_check_query).one()
                if row_check is not None:  # row exists in both version.
                    continue
                self.push_to_patch(
                    current_version_row.id,
                    current_version_row.id,
                    mode="delete"
                )
        print("Complete insert's duration is: {}".format(
            datetime.utcnow() - start_time)
        )
        # Calling last_patch for all remaining diffs
        modes = [
            'update',
            'insert',
            'delete'
        ]
        for mode in modes:
            json_patch_file = os.path.join(
                self.working_dir,
                "version_{}_{}.json".format(
                    self.target_version, mode
                )
            )
            if mode == 'update':
                self.push_to_update_patch(
                    json_patch_file,
                    last_patch=True
                )
            if mode == 'insert':
                self.push_to_insert_patch(
                    json_patch_file,
                    last_patch=True
                )
            if mode == 'delete':
                self.push_to_delete_patch(
                    json_patch_file,
                    last_patch=True
                )


if __name__ == '__main__':
    example1 = PythonCassandraExample(
        file_to_be_versioned="hundred_million_eleven_columns.csv"
    )
    example1.createsession()
    example1.setlogger()
    example1.createkeyspace('sat_athena_one')
    example1.create_table_and_set_version('five_hundred_rows')
    example1.clone_version()
    example1.insert_data(generate_diff=True)

У меня есть CSV-файл из 100M строк и 11 столбцов.Сценарий, используемый для создания такого файла:

import csv
import sys
import os
import pandas as pd

file_name = "hundred_million_eleven_columns.csv"
rows_list = []
chunk_counter = 1
headers = [
    "column_test_1",
    "column_test_2",
    "column_test_3",
    "column_test_4",
    "column_test_5",
    "column_test_6",
    "column_test_7",
    "column_test_8",
    "column_test_9",
    "column_test_10",
    "column_test_11",
]

file_exists = os.path.isfile(file_name)
with open(file_name, 'a') as csvfile:
    writer = csv.DictWriter(csvfile, delimiter=',',
                            lineterminator='\n', fieldnames=headers)
    if not file_exists:
        writer.writeheader()  # file doesn't exist yet, write a header

for i in range(100000000):
    dict1 = [
        i, i+1, i+2, i+3, i+4, i+5, i+6, i+7, i+8, i+9, i+10
    ]
    # get input row in dictionary format
    # key = col_name
    rows_list.append(dict1)
    if len(rows_list) == 100000:
        df = pd.DataFrame(rows_list)
        df.to_csv(file_name,
                  mode='a', index=False, header=False)
        del rows_list[:]
        del df
        print(chunk_counter)
        chunk_counter += 1
if len(rows_list) > 0:
    df = pd.DataFrame(rows_list)
    df.to_csv(file_name, mode='a', index=False, header=False)
    del rows_list[:]
    del df
    print(chunk_counter)
    chunk_counter += 1

Файл yaml моей Кассандры здесь

Ответы [ 2 ]

0 голосов
/ 23 января 2019

Комментарий рекомендует 8 * количество ядер.

С другой стороны, поскольку записи почти никогда не связаны с IO, идеал количество "concurrent_writes" зависит от количества ядер в ваша система; (8 * number_of_cores) - хорошее эмпирическое правило.

64 подходит для 8-ядерной машины.

  • concurrent_reads: 64
  • concurrent_writes: 64
  • concurrent_counter_writes: 64

Эти ограничения могут быть рекомендованы, поскольку существует много других операций ввода-вывода, кроме нормального ввода-вывода. ex) запись журнала фиксации, кэширование, сжатие, репликация, просмотр (если есть)

Некоторые эмпирические правила

  • disk_optimization_strategy: ssd // Если у вас жесткий диск, значение chage для вращения
  • использовать выделенный диск журнала фиксации. рекомендуется ssd.
  • больше дисков = лучшая производительность
0 голосов
/ 23 января 2019

Удостоверьтесь, что ваш код может генерировать столько же при 50k. Если вы удалите команды execute, можете ли вы прочитать CSV и сгенерировать ша так быстро? Экземпляр C * на хосте такого размера с твердотельными накопителями должен иметь возможность выполнять 50 тыс. Записей в секунду, но за пределами C * происходит много операций записи, которые, вероятно, являются частью проблемы.

Если число одновременных операций чтения / записи превышает 128, у вас возникнут серьезные проблемы. В системе, которая может справиться с этим, даже 64 достаточно для того, чтобы преодолеть 200 тыс. Записей в секунду. Вы на самом деле собираетесь сделать вещи намного хуже с этим максимумом сеттинга. В этом нет никаких операций ввода-вывода, поэтому в документации говорится, что 8-кратное число ядер является хорошим значением. Я бы даже рекомендовал понизить параллелизм вашего пуша с 10К до 1024 или даже ниже. Вы можете играть с различными настройками, чтобы увидеть, как это влияет на вещи.

Убедитесь, что python был скомпилирован с cython в вашей установке, иначе он будет доминировать при сериализации. Говоря о драйвере питона, самый медленный, так что имейте это в виду.

Ваша блокировка на ша может быть большую часть времени. Без перфорирования - просто попробуйте с фиксированным значением, чтобы увидеть разницу.

«Моя машина» - это кластер из одного узла? Если ваша выбрасывающая доступность из окна может также отключить durable_writes в пространстве ключей, чтобы немного ускорить запись. Отсутствуют настройки кучи, но убедитесь, что у вас есть минимум 8 Гб, даже если это довольно маленький хост, Кассандре нужна память. Если вы не читаете, подумайте об отключении кэша ключей и, возможно, отключении уплотнений во время выполнения задания, а затем включите его.

...