Я использую spark-sql 2.4.1, spark-cassandra-connector_2.11-2.4.1.jar и java8.
У меня есть таблица cassandra, подобная этой:
CREATE company(company_id int, start_date date, company_name text, PRIMARY_KEY (company_id, start_date))
WITH CLUSTERING ORDER BY (start_date DESC);
Поле start_date здесь является производным полем, которое вычисляется в бизнес-логике.
У меня есть spark-sqlПотоковый код, в котором я вызываю ниже mapFunction.
public static MapFunction<Company, CompanyTransformed> mapFunInsertCompany = ( record ) ->{
CompanyTransformed rec = new CompanyTransformed();
rec.setCompany_id(record.getCompanyId());
rec.setCompany_name(record.getCompanyName());
if(record.getChangeFlag().equalsIgnoreCase("I") && record.getCreateDate() != null )
rec.setStart_date(record.getCreateDate());
if(record.getChangeFlag().equalsIgnoreCase("U"))
rec.setStart_date(new Date(CommonUtils.today().getTime() + 86400000));
return rec;
};
Во время запуска моего потребителя, когда в теме kafka нет записей, потоковый поток непрерывно вызывает выше функции карты.
Потому что запись. getCreateDate () = null start_date имеет значение null.
Но start_date является частью первичного ключа в моей таблице C *, следовательно, ошибка вставки и неопределенное ожидание зажигания НЕ МОГУТ восстановить и сохранить данные в таблицу C *.
Итак 1. что нужно сделать, чтобы это исправить? Любая подсказка, пожалуйста?
Часть 2:
Как восстановить систему после сбоя?
latestRecords .writeStream () .foreachBatch ((batchDf, batchId) -> {batchDf .write () .format ("org.apache.spark.sql.cassandra ") .option (" таблица "," компания ") .option (" пространство ключей "," ks_1 ") .mode (SaveMode.Append) .save ();}). start () .. awaitTermination ();
Я использую Java API выше, я не нахожу эквивалентный метод для проверки "isEmpty" rdd в Java.
Любой ключ, как обрабатывать в Java?
Часть 3:
Пробовал это
.foreachBatch((batchDf, batchId) -> {
System.out.println( "latestRecords batchDf.isEmpty : " +
batchDf.isEmpty() + "\t length : " + batchDf.rdd().getPartitions().length);
}
Дает вывод как
latestRecords batchDf.isEmpty : false length : 6
Так как проверить isEmpty? as isEmpty: false
part 4:
Пока я запускаю потребителя, в теме нет данных. Несмотря на то, что в наборе данных нет данных, но при подсчете отображается 3, как показано ниже, выводится вывод, как это возможно?
Если я попробую этот вывод
.foreachBatch((batchDf, batchId) -> {
System.out.println( "latestRecords batchDf.rdd().count : " + batchDf.rdd().count() + "\t batchDf.count :" + batchDf.count());
}
latestRecords batchDf.rdd().count : 3 batchDf.count :3