Я использую spark-sql 2.4.1, spark-cassandra-connector_2.11-2.4.1.jar и java8.У меня есть сценарий, где мне нужно объединить потоковые данные с данными таблицы C * / Cassandra.
Если запись / объединение найдены, мне нужно скопировать существующую запись таблицы C * в другую table_bkp и обновить фактическую запись таблицы C *с последними данными.
Поскольку поступают потоковые данные, мне нужно выполнить это.Можно ли это сделать с помощью парообразования spark-sql?Если да, то как это сделать?какие предостережения нужно позаботиться?
Для каждой партии, как получить свежие данные таблицы C *?
Что я здесь не так делаю
У меня есть две таблицы, как показано ниже: «master_table» и «backup_table»
table kspace.master_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );
table kspace.backup_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
backup_timestamp timestamp,
PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC, backup_timestamp DESC);
Each streaming record would have "statement_flag" which might be "I" or "U".
If record with "I" comes we directly insert into "master_table".
If record with "U" comes , need to check if there is any record for given ( statement_id ), statement_date in "master_table".
If there is record in "master_table" copy that one to "backup_table" with current timestamp i.e. backup_timestamp
Update the record in "master_table" with latest record.
Для достижения вышеуказанного я делаю PoC / код, как показано ниже
Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));
String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";
Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);
writeDfToCassandra( baseDs.toDF(), keyspace, master_table);
u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");
Dataset<Row> joinUpdatedRecordsDs = sparkSession.sql(
" select p.statement_id, p.statement_flag, p.statement_date,"
+ "p.x_val,p.y_val,p.z_val "
+ " from persisted_records as p "
+ "join u_records as u "
+ "on p.statement_id = u.statement_id and p.statement_date = u.statement_date");
Dataset<Row> updated_records = joinUpdatedRecordsDs
.withColumn("backup_timestamp",current_timestamp());
updated_records.show(); //Showing correct results
writeDfToCassandra( updated_records.toDF(), keyspace, backup_table); // But here/backup_table copying the latest "master_table" records
Пример данных
Для первой записи с флагом "I"
master_table
backup_table
Для второй записи с флагом "U", т. Е. Такой же, как ранее, за исключением "данные столбца y_val
master_table
backup_table
Ожидается
Но фактические данные таблицы
Вопрос:
До отображения кадра данных (updated_records) с правильными данными.Но когда я вставляю тот же самый фрейм данных (updated_records) в таблицу, данные C * backup_table отображаются точно так же, как и последняя запись master_table, но в которой предполагается наличие более ранней записи master_table.
updated_records.show(); //Showing correct results
writeDfToCassandra( updated_records.toDF(), keyspace, backup_table); // But here/backup_table copying the latest "master_table" records
Так что я делаю неправильнов приведенном выше коде программы?