Я использую spark-sql 2.4.1, spark-cassandra-connector_2.11-2.4.1.jar и java8.У меня есть сценарий, где мне нужно объединить потоковые данные с данными таблицы C * / Cassandra.
У меня есть две таблицы, как показано ниже: * master_table"&" backup_table"
table kspace.master_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );
table kspace.backup_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
backup_timestamp timestamp,
PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC, backup_timestamp DESC);
Каждая потоковая запись будет иметь «Statement_flag», который может быть «I» или «U».
- Если приходит запись с «I», мы непосредственно вставляем в «master_table».
- Если приходит запись с «U», нам нужно проверить, есть ли запись для данного (Statement_id), Statement_date в «master_table».
- Если есть запись в «master_table»скопируйте его в «backup_table» с текущей меткой времени, т.е. backup_timestamp.
- Обновите запись в «master_table» последней записью.
Следующий код должен это делать, но этоне.
Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));
String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";
Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);
writeDfToCassandra(baseDs.toDF(), keyspace, master_table);
u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");
Dataset<Row> joinUpdatedRecordsDs = sparkSession.sql(
" select p.statement_id, p.statement_flag, p.statement_date,"
+ "p.x_val,p.y_val,p.z_val "
+ " from persisted_records as p "
+ "join u_records as u "
+ "on p.statement_id = u.statement_id and p.statement_date = u.statement_date");
Dataset<Row> updated_records = joinUpdatedRecordsDs
.withColumn("backup_timestamp", current_timestamp());
writeDfToCassandra(updated_records.toDF(), keyspace, backup_table);
Каждая вещь из «master_table» копируется в backup_table.Фактически предполагается, что копируется только предыдущая версия записей, но не самая последняя.
Предполагается копировать только предыдущую версию записей из master_table, но все данные были скопированы в backup_table
Как сделать только предыдущую версию записейс master_table
копируется только в backup_table
, не последний?
Пример данных
Для первой записи с флагом "I"
master_table
![enter image description here](https://i.stack.imgur.com/QjzWf.png)
backup_table
![enter image description here](https://i.stack.imgur.com/HRMH0.png)
Для второй записи с флагом "U", т.е. то же, что и ранеекроме данных столбца "y_val".
master_table
![enter image description here](https://i.stack.imgur.com/d2EBv.png)
backup_table
Ожидается
![enter image description here](https://i.stack.imgur.com/P0eOC.png)
Но фактические данные таблицы -
![enter image description here](https://i.stack.imgur.com/kj8Sl.png)