Можем ли мы строго наложить поток выполнения заказа в потоковой передаче искровой структуры? - PullRequest
0 голосов
/ 11 октября 2019

Я использую spark-sql 2.4.1, spark-cassandra-connector_2.11-2.4.1.jar и java8. У меня есть сценарий, где мне нужно объединить потоковые данные с данными таблицы C * / Cassandra.

Если запись / объединение найдены, мне нужно скопировать существующую запись таблицы C * в другой table_bkp и обновить фактический C* запись таблицы с последними данными.

Когда поступают потоковые данные, мне нужно выполнить это. Хотя я выполняю объединение, как показано ниже, оно объединяет / обновляет таблицу с последними потоковыми записями в table_bkp вместо объединения с ранее полученной записью.

Что я здесь не так делаю? как это исправить, т. е. как строго навязать: сначала он соединится с первой потоковой записью, затем вставит результаты в table_bkp, затем соединится с более поздней записью, вставит inot table_bkp и продолжит ....

Iесть две таблицы, как показано ниже: «master_table» и «backup_table»

table kspace.master_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );

table kspace.backup_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    backup_timestamp timestamp,
    PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC,   backup_timestamp DESC);

Каждая потоковая запись будет иметь «Statement_flag», который может быть «I» или «U». Если приходит запись с «I», мы напрямую вставляем в «master_table». Если приходит запись с «U», необходимо проверить, есть ли какая-либо запись для данного (Statement_id), Statement_date в «master_table». Если в «master_table» есть запись, скопируйте ее в «backup_table» с текущей меткой времени, т.е. backup_timestamp. Обновите запись в «master_table» последней записью. Для достижения вышеизложенного я делаю PoC / код, как показано ниже

Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));

String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";


Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);

writeDfToCassandra( baseDs.toDF(), keyspace, master_table);


u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");

Dataset<Row> joinUpdatedRecordsDs =  sparkSession.sql(
            " select p.statement_id, p.statement_flag, p.statement_date,"
            + "p.x_val,p.y_val,p.z_val "
            + " from persisted_records as p "
            + "join u_records as u "
            + "on p.statement_id = u.statement_id  and p.statement_date = u.statement_date");



Dataset<Row> updated_records =   joinUpdatedRecordsDs
                            .withColumn("backup_timestamp",current_timestamp());

updated_records.show(); //Showing correct results 


writeDfToCassandra( updated_records.toDF(), keyspace, backup_table);  // But here/backup_table copying the latest "master_table" records 

Пример данных

Для первой записи с флагом "I"

master_table

enter image description here

backup_table

enter image description here

Для второй записи с флагом "U", т.е. То же, что и ранее, за исключением данных столбца "y_val"

master_table

enter image description here

backup_table

Ожидается

enter image description here

Но фактические данные таблицы

enter image description here

Вопрос:

Когда поступают потоковые данные, мне нужно выполнить это. Хотя я выполняю соединение, как показано выше, оно объединяет / обновляет таблицу с использованием последней поточной записи и копирует ее в table_bkp вместо того, чтобы сначала соединяться с ранее полученной записью, а затем делать то же самое со второй и третьей, как они получили. Вместо этого он объединяет последнюю полученную запись и сохраняет данные в таблицу C *, т.е. backup_tab

Что я здесь не так делаю? как это исправить, т. е. как строго навязать: сначала он соединится с первой потоковой записью, затем вставит результаты в table_bkp, затем объединится с более поздней записью, вставит inot table_bkp и продолжит ....

Итакчто я делаю не так в приведенном выше программном коде? Как наложен контроль потока строго.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...