Cassandra медленно работает на AWS - PullRequest
0 голосов
/ 12 мая 2019

Один из наших администраторов баз данных сравнил Cassandra с Oracle на AWS EC2 для производительности INSERT (1M записей), используя тот же код Python (ниже), и получил следующие удивительные результаты:

Oracle 12.2 , один узел, 64 ядра / 256 ГБ, хранилище EC2 EBS, 38 с

Cassandra 5.1.13 (DDAC), один узел , 2 ядра / 4 ГБ, хранилище EC2 EBS, 464 сек

Cassandra 3.11.4, четыре узла , 16 ядер / 64 ГБ (каждый узел), хранилище EC2 EBS, 486 с

ТАК - Что мы делаем не так?
Почему Кассандра играет так медленно?
* Недостаточно узлов? (Почему 4 узла медленнее, чем один узел?)
* Проблемы с конфигурацией?
* Что-то еще?

Спасибо!

Ниже приведен код Python:

import logging
import time
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster, BatchStatement
from cassandra.query import SimpleStatement
from cassandra.auth import PlainTextAuthProvider

class PythonCassandraExample:

    def __init__(self):
        self.cluster = None
        self.session = None
        self.keyspace = None
        self.log = None

    def __del__(self):
        self.cluster.shutdown()

    def createsession(self):
        auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
        self.cluster = Cluster(['10.220.151.138'],auth_provider = auth_provider)
        self.session = self.cluster.connect(self.keyspace)

    def getsession(self):
        return self.session

    # How about Adding some log info to see what went wrong
    def setlogger(self):
        log = logging.getLogger()
        log.setLevel('INFO')
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
        log.addHandler(handler)
        self.log = log

    # Create Keyspace based on Given Name
    def createkeyspace(self, keyspace):
        """
        :param keyspace:  The Name of Keyspace to be created
        :return:
        """
        # Before we create new lets check if exiting keyspace; we will drop that and create new
        rows = self.session.execute("SELECT keyspace_name FROM system_schema.keyspaces")
        if keyspace in [row[0] for row in rows]:
            self.log.info("dropping existing keyspace...")
            self.session.execute("DROP KEYSPACE " + keyspace)

        self.log.info("creating keyspace...")
        self.session.execute("""
                CREATE KEYSPACE %s
                WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
                """ % keyspace)

        self.log.info("setting keyspace...")
        self.session.set_keyspace(keyspace)

    def create_table(self):
        c_sql = """
                CREATE TABLE IF NOT EXISTS employee (emp_id int PRIMARY KEY,
                                              ename varchar,
                                              sal double,
                                              city varchar);
                 """
        self.session.execute(c_sql)
        self.log.info("Employee Table Created !!!")

    # lets do some batch insert
    def insert_data(self):
        i = 1
        while i < 1000000:
          insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
          batch = BatchStatement()
          batch.add(insert_sql, (i, 'Danny', 2555, 'De-vito'))
          self.session.execute(batch)
          # self.log.info('Batch Insert Completed for ' + str(i))
          i += 1

    # def select_data(self):
    #    rows = self.session.execute('select count(*) from perftest.employee limit 5;')
    #    for row in rows:
    #        print(row.ename, row.sal)

    def update_data(self):
        pass

    def delete_data(self):
        pass


if __name__ == '__main__':
    example1 = PythonCassandraExample()
    example1.createsession()
    example1.setlogger()
    example1.createkeyspace('perftest')
    example1.create_table()

    # Populate perftest.employee table
    start = time.time()
    example1.insert_data()
    end = time.time()
    print ('Duration: ' + str(end-start) + ' sec.')

    # example1.select_data()

Ответы [ 2 ]

5 голосов
/ 13 мая 2019

Здесь есть несколько проблем:

  • для 2-го теста вы не выделили достаточно памяти и ядер для DDAC, поэтому Cassandra получила всего 1 Гб кучи - Cassandra по умолчанию занимает 1/4 от всей доступной памяти. То же самое относится и к третьему тесту - он получит только 16 ГБ ОЗУ для кучи, вам может потребоваться увеличить его, например, до 24 ГБ или даже выше.
  • не ясно, сколько IOP у вас в каждом тесте - EBS имеет разную пропускную способность в зависимости от размера тома и его типа
  • Вы используете синхронный API для выполнения команд - в основном вы вставляете следующий элемент после того, как получите подтверждение, что предыдущий вставлен. Наилучшая пропускная способность может быть достигнута с использованием асинхронного API ;
  • Вы готовите свой оператор на каждой итерации - это приводит к тому, что каждый раз отправляется CQL-строка на сервер, поэтому все замедляется - просто выведите строку insert_sql = self.session.prepare( из цикла;
  • (не полностью связанный) Вы используете пакетные операторы для записи данных - это антишаблон в Cassandra , так как данные отправляются только одному узлу, который затем должен распределять данные по узлам, которые действительно владеют данные. Это объясняет, почему кластер из 4 узлов хуже, чем кластер из 1 узла.

P.S. реалистичное нагрузочное тестирование довольно сложно. Для этого есть специализированные инструменты, вы можете найти, например, дополнительную информацию в этом блоге .

0 голосов
/ 12 мая 2019

Обновленный код ниже будет пакетировать каждые 100 записей:

"""
Python  by Techfossguru
Copyright (C) 2017  Satish Prasad

"""
import logging
import warnings
import time
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster, BatchStatement
from cassandra.query import SimpleStatement
from cassandra.auth import PlainTextAuthProvider

class PythonCassandraExample:

    def __init__(self):
        self.cluster = None
        self.session = None
        self.keyspace = None
        self.log = None

    def __del__(self):
        self.cluster.shutdown()

    def createsession(self):
        auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
        self.cluster = Cluster(['10.220.151.138'],auth_provider = auth_provider)
        self.session = self.cluster.connect(self.keyspace)

    def getsession(self):
        return self.session

    # How about Adding some log info to see what went wrong
    def setlogger(self):
        log = logging.getLogger()
        log.setLevel('INFO')
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
        log.addHandler(handler)
        self.log = log

    # Create Keyspace based on Given Name
    def createkeyspace(self, keyspace):
        """
        :param keyspace:  The Name of Keyspace to be created
        :return:
        """
        # Before we create new lets check if exiting keyspace; we will drop that and create new
        rows = self.session.execute("SELECT keyspace_name FROM system_schema.keyspaces")
        if keyspace in [row[0] for row in rows]:
            self.log.info("dropping existing keyspace...")
            self.session.execute("DROP KEYSPACE " + keyspace)

        self.log.info("creating keyspace...")
        self.session.execute("""
                CREATE KEYSPACE %s
                WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
                """ % keyspace)

        self.log.info("setting keyspace...")
        self.session.set_keyspace(keyspace)

    def create_table(self):
        c_sql = """
                CREATE TABLE IF NOT EXISTS employee (emp_id int PRIMARY KEY,
                                              ename varchar,
                                              sal double,
                                              city varchar);
                 """
        self.session.execute(c_sql)
        self.log.info("Employee Table Created !!!")

    # lets do some batch insert
    def insert_data(self):
        i = 1
        insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
        batch = BatchStatement()
        warnings.filterwarnings("ignore", category=FutureWarning)

        while i < 1000001:
          # insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
          # batch = BatchStatement()
          batch.add(insert_sql, (i, 'Danny', 2555, 'De-vito'))

          # Commit every 100 records
          if (i % 100 == 0):
             self.session.execute(batch)
             batch = BatchStatement()
             # self.log.info('Batch Insert Completed for ' + str(i))
          i += 1
        self.session.execute(batch)

    # def select_data(self):
    #    rows = self.session.execute('select count(*) from actimize.employee limit 5;')
    #    for row in rows:
    #        print(row.ename, row.sal)

    def update_data(self):
        pass

    def delete_data(self):
        pass


if __name__ == '__main__':
    example1 = PythonCassandraExample()
    example1.createsession()
    example1.setlogger()
    example1.createkeyspace('actimize')
    example1.create_table()

    # Populate actimize.employee table
    start = time.time()
    example1.insert_data()
    end = time.time()
    print ('Duration: ' + str(end-start) + ' sec.')

    # example1.select_data()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...