Может ли кто-нибудь объяснить внутреннюю работу spark при чтении данных из одной таблицы и записи их в другую на Кассандре.
Вот мой пример использования:
Я принимаю данные, поступающие вс платформы IOT в кассандру через тему кафки.У меня есть небольшой скрипт на python, который анализирует каждое сообщение от kafka, чтобы получить имя таблицы, к которой он принадлежит, готовит запрос и записывает его в cassandra, используя драйвер dassastax cassandra-driver для python.С помощью этого скрипта я могу принимать около 10000 * 300000 записей в минуту в Кассандру.Однако моя скорость входящих данных составляет 510000 записей в минуту , поэтому отставание потребителя kafka продолжает увеличиваться.
Сценарий Python уже выполняет параллельные вызовы cassandra.Если я увеличу количество исполнителей Python, драйвер cassandra начинает отказывать, потому что узлы cassandra становятся недоступными для него.Я уверен, что есть предел количества вызовов кассандры в секунду, которые я там бью.Вот сообщение об ошибке, которое я получаю:
ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"
Недавно я запустил задание pyspark для копирования данных из пары столбцов в одной таблице в другую.В таблице было около 168 миллионов записей.Работа Pyspark завершена примерно за 5 часов.Таким образом, было обработано более 550000 записей в минуту .
Вот код pyspark, который я использую:
df = spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table=sourcetable, keyspace=sourcekeyspace)\
.load().cache()
df.createOrReplaceTempView("data")
query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value from data " )
vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table=newtable, keyspace=newkeyspace)\
.save()
Версии:
- Cassandra 3.9.
- Spark 2.1.0.
- Разъем спарк-кассандры от Datastax 2.0.1
- Версия Scala 2.11
Кластер:
- Настройка Spark с 3 рабочими и 1 главным узлом.
- 3 рабочим узлам также установлен кластер кассандры.(каждый узел кассандры с одним рабочим узлом искры)
- Каждому работнику было разрешено 10 ГБ ОЗУ и 3 ядра.
Поэтому мне интересно:
Сперва ли спарк читает все данные из cassandra, а затем записывает их в новую таблицу, или есть какая-то оптимизация в соединителе spark cassandra, которая позволяет перемещать данные по таблицам cassandra без чтения всех записей?
Если я заменю свой скрипт на Python заданием потоковой передачи, в котором я анализирую пакет, чтобы получить имя таблицы для cassandra, поможет ли это быстрее вводить данные в cassandra?