Я пытаюсь убедиться, что все события либо записаны в базу данных, если они приняты схемой, либо зарегистрированы как ошибки, не должно быть никаких событий, полностью потерянных из-за какой-либо базы данных, сетевых ошибок и т. Д.
Это то, что я имею до сих пор.Кажется неправильным создавать новый RDD для каждой строки, чтобы повторить их по отдельности, но я не вижу другого способа использования SQLContext.
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
new JavaStreamingContext(javaSparkContext, Durations.seconds(queryInterval)),
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics(), kafkaParameters())
);
SQLContext sqlContext = SQLContext.getOrCreate(javaSparkContext.sc());
stream.map(cr->row)
.foreachRDD(rowRDD -> {
try {
// try the batch
Dataset<Row> rowDataset = sqlContext.createDataFrame(rowRDD.rdd(), someStructType);
rowDataset
.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl(), jdbcTable(), jdbcConnectionProperties());
/*
Need to change to only Database Exceptions,
other exceptions will cause the event to fail, hence should retry from kafka
*/
} catch (Exception e1){
// try individually
rowRDD.foreach(row->{
// create RDD for each row, could be slow...
JavaRDD javaRDD = javaSparkContext.parallelize(Arrays.asList(row));
// try write for each RDD now containing only one item
int attempts = 0;
while (attempts<5){
attempts++;
try {
Dataset<Row> rowDataset = sqlContext.createDataFrame(javaRDD, someStructType);
rowDataset
.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl(), jdbcTable(), jdbcConnectionProperties());
/*
Need to change to only Database Exceptions,
if non database error i.e. network error need to retry here,
as should not retry the whole batch
*/
} catch (Exception e2){
LOG.warn("Event could not be written to database: ",e2);
break;
}
try {
// wait 1 second before retrying
Thread.sleep(1000);
} catch (InterruptedException e){}
}
});
}
});