Запрос Cassandra UDT через Spark SQL - PullRequest
0 голосов
/ 12 декабря 2018

Мы бы хотели запросить данные из БД Cassandra через SparkSQL .Проблема в том, что данные хранятся в кассандре как UDT .Структура UDT глубоко вложена и содержит массивы переменной длины, поэтому было бы очень сложно разложить данные на плоскую структуру.Я не смог найти ни одного рабочего примера, как запрашивать такие UDT через SparkSQL - особенно для фильтрации результатов по значениям UDT.

В качестве альтернативы, вы могли бы предложить другой конвейер ETL (механизм запросов, механизм хранения, ...), который был бы более подходящим для нашего варианта использования?

Наш конвейер ETL:

Кафка (дублированные события) -> Поток искры -> Cassandra (дедупликация для сохранения только последнего события) <- Spark SQL <- аналитическая платформа (UI) </p>

Решения, которые мы уже опробовали:

1) Kafka -> Spark -> Parquet <- Apache Drill </p>

Все работало хорошо, мы могли запрашивать и фильтровать массивы и вложенные структуры данных.

Проблема: не удалосьдедупликация данных (перезапись файлов паркета с последними событиями)

2) Kafka -> Spark -> Cassandra <- Presto </p>

Решена проблема 1) с дедупликацией данных.

Проблема: Presto не поддерживает типы UDT ( presto doc , Presto Issue )

Наши основные требования:

  • поддержка дедупликации данных.Мы можем получать много событий с одним и тем же идентификатором, и нам нужно хранить только самое последнее.
  • хранение глубоко вложенной структуры данных с массивами
  • распределенное хранилище, масштабируемое для будущего расширения
  • механизм распределенных запросов с поддержкой SQL-подобных запросов (для связи с Zeppelin, Tableau, Qlik, ...).Запрос не должен выполняться в режиме реального времени, допускается задержка в несколько минут.
  • поддержка эволюции схемы (стиль AVRO)

Спасибо за любые предложения

1 Ответ

0 голосов
/ 29 декабря 2018

Вы можете просто использовать точечный синтаксис для выполнения запросов к вложенным элементам.Например, если у меня есть следующие определения CQL:

cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, {id: 1, t1: {id: 1, t: 't1'}});
cqlsh:test> insert into nudt (id, t2) values (2, {id: 2, t1: {id: 2, t: 't2'}});
cqlsh:test> SELECT * from nudt;

 id | t2
----+-------------------------------
  1 | {id: 1, t1: {id: 1, t: 't1'}}
  2 | {id: 2, t1: {id: 2, t: 't2'}}

(2 rows)

Затем я могу загрузить эти данные следующим образом:

scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
     options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
|  2|[2,[2,t2]]|
+---+----------+

И затем запросить данные, чтобы выбрать только определенные значения поля вUDT:

scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> res.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
+---+----------+

Вы можете использовать либо spark.sql, либо соответствующие .filter функции - в зависимости от вашего стиля программирования.Этот метод работает с любыми данными структурного типа, поступающими из разных источников, таких как JSON и т. Д.

Но учтите, что вы не получите оптимизацию от коннектора Cassandra, как вы получаете при запросах по ключам секций/ столбцы кластеризации

...