Как спарк копирует данные между таблицами Кассандры? - PullRequest
0 голосов
/ 01 июня 2018

Может ли кто-нибудь объяснить внутреннюю работу spark при чтении данных из одной таблицы и записи их в другую на Кассандре.

Вот мой пример использования:

Я принимаю данные, поступающие вс платформы IOT в кассандру через тему кафки.У меня есть небольшой скрипт на python, который анализирует каждое сообщение от kafka, чтобы получить имя таблицы, к которой он принадлежит, готовит запрос и записывает его в cassandra, используя драйвер dassastax cassandra-driver для python.С помощью этого скрипта я могу принимать около 10000 * 300000 записей в минуту в Кассандру.Однако моя скорость входящих данных составляет 510000 записей в минуту , поэтому отставание потребителя kafka продолжает увеличиваться.

Сценарий Python уже выполняет параллельные вызовы cassandra.Если я увеличу количество исполнителей Python, драйвер cassandra начинает отказывать, потому что узлы cassandra становятся недоступными для него.Я уверен, что есть предел количества вызовов кассандры в секунду, которые я там бью.Вот сообщение об ошибке, которое я получаю:

ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"

Недавно я запустил задание pyspark для копирования данных из пары столбцов в одной таблице в другую.В таблице было около 168 миллионов записей.Работа Pyspark завершена примерно за 5 часов.Таким образом, было обработано более 550000 записей в минуту .

Вот код pyspark, который я использую:

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table=sourcetable, keyspace=sourcekeyspace)\
    .load().cache()

df.createOrReplaceTempView("data")

query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value  from data  " )

vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table=newtable, keyspace=newkeyspace)\
    .save()

Версии:

  • Cassandra 3.9.
  • Spark 2.1.0.
  • Разъем спарк-кассандры от Datastax 2.0.1
  • Версия Scala 2.11

Кластер:

  • Настройка Spark с 3 рабочими и 1 главным узлом.
  • 3 рабочим узлам также установлен кластер кассандры.(каждый узел кассандры с одним рабочим узлом искры)
  • Каждому работнику было разрешено 10 ГБ ОЗУ и 3 ядра.

Поэтому мне интересно:

  • Сперва ли спарк читает все данные из cassandra, а затем записывает их в новую таблицу, или есть какая-то оптимизация в соединителе spark cassandra, которая позволяет перемещать данные по таблицам cassandra без чтения всех записей?

  • Если я заменю свой скрипт на Python заданием потоковой передачи, в котором я анализирую пакет, чтобы получить имя таблицы для cassandra, поможет ли это быстрее вводить данные в cassandra?

1 Ответ

0 голосов
/ 01 июня 2018

Разъем Spark оптимизирован, поскольку он распараллеливает обработку и чтение / вставку данных в узлы, которым принадлежат данные.Вы можете повысить пропускную способность, используя Cassandra Spark Connector, но для этого потребуется больше ресурсов.

Говоря о вашей задаче - 300000 вставок в минуту - это 5000 в секунду, и, честно говоря, это не очень большое число - вы можетеувеличить пропускную способность, добавив различные оптимизации:

  • Использование асинхронных вызовов для отправки запросов.Вам нужно только убедиться, что вы отправляете больше запросов, которые могут быть обработаны одним соединением (но вы также можете увеличить это число - я не уверен, как это сделать в Python, но, пожалуйста, проверьте Драйвер Java doc чтобы получить представление).
  • используйте правильный уровень согласованности (LOCAL_ONE должен дать вам очень хорошую производительность)
  • используйте правильную политику балансировки нагрузки
  • вы можете запустить несколько копий своего сценария параллельно, убедившись, что все они находятся в одной группе потребителей Kafka.
...