Возможно ли объединение потоковых данных с данными таблицы и обновление таблицы при получении потока? - PullRequest
0 голосов
/ 25 сентября 2019

Я использую spark-sql 2.4.1, spark-cassandra-connector_2.11-2.4.1.jar и java8.У меня есть сценарий, где мне нужно объединить потоковые данные с данными таблицы C * / Cassandra.

Если запись / объединение найдены, мне нужно скопировать существующую запись таблицы C * в другую table_bkp и обновить фактическую запись таблицы C *с последними данными.

Поскольку поступают потоковые данные, мне нужно выполнить это.Можно ли это сделать с помощью парообразования spark-sql?Если да, то как это сделать?какие предостережения нужно позаботиться?

Для каждой партии, как получить свежие данные таблицы C *?

Что я здесь не так делаю

У меня есть две таблицы, как показано ниже: «master_table» и «backup_table»

table kspace.master_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );

table kspace.backup_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    backup_timestamp timestamp,
    PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC,   backup_timestamp DESC);


Each streaming record would have "statement_flag" which might be "I" or "U".
If record with "I" comes we directly insert into "master_table".
If record with "U" comes , need to check if there is any record for given ( statement_id ), statement_date in "master_table".
     If there is record in "master_table" copy that one to "backup_table" with current timestamp i.e. backup_timestamp
     Update the record in "master_table" with latest record.

Для достижения вышеуказанного я делаю PoC / код, как показано ниже

Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));

String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";


Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);

writeDfToCassandra( baseDs.toDF(), keyspace, master_table);


u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");

Dataset<Row> joinUpdatedRecordsDs =  sparkSession.sql(
            " select p.statement_id, p.statement_flag, p.statement_date,"
            + "p.x_val,p.y_val,p.z_val "
            + " from persisted_records as p "
            + "join u_records as u "
            + "on p.statement_id = u.statement_id  and p.statement_date = u.statement_date");



Dataset<Row> updated_records =   joinUpdatedRecordsDs
                            .withColumn("backup_timestamp",current_timestamp());

updated_records.show(); //Showing correct results 


writeDfToCassandra( updated_records.toDF(), keyspace, backup_table);  // But here/backup_table copying the latest "master_table" records 

Пример данных

Для первой записи с флагом "I"

master_table

enter image description here

backup_table

enter image description here

Для второй записи с флагом "U", т. Е. Такой же, как ранее, за исключением "данные столбца y_val

master_table

enter image description here

backup_table

Ожидается

enter image description here

Но фактические данные таблицы

enter image description here

Вопрос:

До отображения кадра данных (updated_records) с правильными данными.Но когда я вставляю тот же самый фрейм данных (updated_records) в таблицу, данные C * backup_table отображаются точно так же, как и последняя запись master_table, но в которой предполагается наличие более ранней записи master_table.

  updated_records.show(); //Showing correct results 


    writeDfToCassandra( updated_records.toDF(), keyspace, backup_table);  // But here/backup_table copying the latest "master_table" records 

Так что я делаю неправильнов приведенном выше коде программы?

1 Ответ

2 голосов
/ 25 сентября 2019

Есть несколько способов сделать это с различными уровнями производительности в зависимости от того, сколько данных вам нужно проверить.

Например, если вы просматриваете данные только по ключу раздела, наиболее эффективная вещь длясделать это использовать joinWithCassandraTable на Dstream.Для каждого пакета это будет извлекать записи, соответствующие входящим ключам раздела.В структурированной потоковой передаче это происходит автоматически при правильно написанном соединении SQL и DSE.Если DSE не используется, он будет полностью сканировать таблицу с каждой партией.

Если вместо этого вам потребуется вся таблица для каждой партии, то при объединении партии DStream с CassandraRDD произойдет повторное чтение СДР.на каждой партии.Это намного дороже, если вся таблица не перезаписывается.

Если вы обновляете записи, не проверяя их предыдущие значения, достаточно просто записать входящие данные непосредственно в таблицу C *.C * использует upserts и поведение последней записи win и просто перезапишет предыдущие значения, если они существовали.

...