Как контролировать обработку искрового потока, пока в теме Кафки нет данных - PullRequest
0 голосов
/ 29 октября 2019

Я использую spark-sql 2.4.1, spark-cassandra-connector_2.11-2.4.1.jar и java8.

У меня есть таблица cassandra, подобная этой:

CREATE company(company_id int, start_date date, company_name text, PRIMARY_KEY (company_id, start_date))
WITH CLUSTERING ORDER BY (start_date DESC);

Поле start_date здесь является производным полем, которое вычисляется в бизнес-логике.

У меня есть spark-sqlПотоковый код, в котором я вызываю ниже mapFunction.

public static MapFunction<Company, CompanyTransformed>  mapFunInsertCompany = ( record ) ->{

  CompanyTransformed  rec = new CompanyTransformed();

  rec.setCompany_id(record.getCompanyId());
  rec.setCompany_name(record.getCompanyName());

  if(record.getChangeFlag().equalsIgnoreCase("I") && record.getCreateDate() != null )
    rec.setStart_date(record.getCreateDate());
  if(record.getChangeFlag().equalsIgnoreCase("U"))
    rec.setStart_date(new Date(CommonUtils.today().getTime() + 86400000));

  return rec;
};

Во время запуска моего потребителя, когда в теме kafka нет записей, потоковый поток непрерывно вызывает выше функции карты.

Потому что запись. getCreateDate () = null start_date имеет значение null.

Но start_date является частью первичного ключа в моей таблице C *, следовательно, ошибка вставки и неопределенное ожидание зажигания НЕ МОГУТ восстановить и сохранить данные в таблицу C *.

Итак 1. что нужно сделать, чтобы это исправить? Любая подсказка, пожалуйста?

Часть 2:

Как восстановить систему после сбоя?

latestRecords .writeStream () .foreachBatch ((batchDf, batchId) -> {batchDf .write () .format ("org.apache.spark.sql.cassandra ") .option (" таблица "," компания ") .option (" пространство ключей "," ks_1 ") .mode (SaveMode.Append) .save ();}). start () .. awaitTermination ();

Я использую Java API выше, я не нахожу эквивалентный метод для проверки "isEmpty" rdd в Java.

Любой ключ, как обрабатывать в Java?

Часть 3:

Пробовал это

.foreachBatch((batchDf, batchId) -> {
    System.out.println( "latestRecords batchDf.isEmpty : " + 
     batchDf.isEmpty() + "\t length : " + batchDf.rdd().getPartitions().length);
 }

Дает вывод как

latestRecords batchDf.isEmpty : false    length : 6

Так как проверить isEmpty? as isEmpty: false

part 4:

Пока я запускаю потребителя, в теме нет данных. Несмотря на то, что в наборе данных нет данных, но при подсчете отображается 3, как показано ниже, выводится вывод, как это возможно?

Если я попробую этот вывод

.foreachBatch((batchDf, batchId) -> {
 System.out.println( "latestRecords batchDf.rdd().count : " + batchDf.rdd().count() + "\t batchDf.count :" + batchDf.count());
}

latestRecords batchDf.rdd().count : 3    batchDf.count :3

1 Ответ

1 голос
/ 29 октября 2019

Вы столкнулись с общей проблемой для приложений Stream Streaming. Если в источнике нет данных (в вашем случае это тема Кафки), Spark создает emptyRDD . Вы можете проверить, если СДР пуста, добавив

if(!rdd.isEmpty)

перед вызовом вашего метода mapFunInsertCompany.

Пожалуйста, обратите внимание на это сообщение в блоге

...