Вы можете просто использовать точечный синтаксис для выполнения запросов к вложенным элементам.Например, если у меня есть следующие определения CQL:
cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, {id: 1, t1: {id: 1, t: 't1'}});
cqlsh:test> insert into nudt (id, t2) values (2, {id: 2, t1: {id: 2, t: 't2'}});
cqlsh:test> SELECT * from nudt;
id | t2
----+-------------------------------
1 | {id: 1, t1: {id: 1, t: 't1'}}
2 | {id: 2, t1: {id: 2, t: 't2'}}
(2 rows)
Затем я могу загрузить эти данные следующим образом:
scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> data.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
| 2|[2,[2,t2]]|
+---+----------+
И затем запросить данные, чтобы выбрать только определенные значения поля вUDT:
scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> res.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
+---+----------+
Вы можете использовать либо spark.sql
, либо соответствующие .filter
функции - в зависимости от вашего стиля программирования.Этот метод работает с любыми данными структурного типа, поступающими из разных источников, таких как JSON и т. Д.
Но учтите, что вы не получите оптимизацию от коннектора Cassandra, как вы получаете при запросах по ключам секций/ столбцы кластеризации