При использовании Spark Cassandra Connector
все потоковые данные всегда вставляются в базу данных cassandra.Хотя это не желаемый результат.
Чего я хотел бы добиться, так это добавить значение в базу данных при совпадении столбца employeetitle
.
Вот что у меня есть
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, Loan>> messages = KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
JavaDStream<Loan> loanDStream = messages.map(record -> record.value());
loanDStream.foreachRDD((loanJavaRDD, time) -> {
System.out.println("Count "+loanJavaRDD.count());
});
JavaDStream<Loan> window = loanDStream.window(Durations.minutes(1), Durations.seconds(10));
JavaPairDStream<String, BigDecimal> employeeTitleLoanPair = window.mapToPair(loan -> new Tuple2<>(loan.getEmployeeTitle(), loan.getLoanAmount())).reduceByKey((bigDecimal, bigDecimal2) -> bigDecimal.add(bigDecimal2));
employeeTitleLoanPair.print();
// Map Cassandra table column
Map<String, String> columnNameMappings = new HashMap<String, String>();
columnNameMappings.put("id", "id");
columnNameMappings.put("employeeTitle", "employeetitle");
columnNameMappings.put("totalLoan", "totalloan");
employeeTitleLoanPair.foreachRDD((pairsRDD, time) -> {
CassandraJavaUtil
.javaFunctions(pairsRDD.map(pair -> new EmployeeLoan(UUID.randomUUID(), pair._1, pair._2)).filter(employeeLoan -> !employeeLoan.getEmployeeTitle().equals("")))
.writerBuilder("loan_keyspace", "emp_title_loans", CassandraJavaUtil.mapToRow(EmployeeLoan.class, columnNameMappings))
.saveToCassandra();
});
// Start the computation
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
Как проверить равенство столбцов и обновить или вставить