У нас есть требование извлекать записи из темы kafka, сопоставлять ее с 5 различными типами POJO и записывать каждый тип POJO в таблицу Cassandra. Я использовал Кассандру Синк, чтобы сделать это.
Есть ли способ, которым я могу сделать это как пакет? Вместо того, чтобы использовать 5 разных приемников для кассандры, я хочу использовать один приемник и одну операцию записи с использованием пакетной записи.
Я посмотрел в CassandraOutputFormat, но я думаю, что для сохранения в одной таблице с несколькими кортежами. Это верно? Можем ли мы использовать сам POJO вместо Tuples?
Пожалуйста, дайте мне знать, если поддерживается пакетная запись в таблицы cassandra.
// create execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
properties.setProperty("zookeeper.connect", "localhost:2181"); // Zookeeper default host:port
properties.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
properties.setProperty("group.id", "cassandra_events"); // Consumer group ID
properties.setProperty("auto.offset.reset", "earliest"); // Always read topic
// from start
DataStream<ParsedPayload> stream = env.addSource(
new FlinkKafkaConsumer010<>("cassandra_events", new ParsedPayloadSchema<ParsedPayload>(), properties));
ClusterBuilder cb = new CassandraClusterBuilder();
// Write to AnalyticsActivityEventInfo table
DataStream<AnalyticsActivityEventInfo> analyticsActivityEventInfoResult = stream
.map(new AnalyticsActivityEventInfoMapFunction());
CassandraSink.addSink(analyticsActivityEventInfoResult).setClusterBuilder(cb).build().name("AnalyticsActivityEventInfo");
// Write to analyticActivityTemp1V1 table
DataStream<AnalyticsActivityTemp1V1> analyticActivityTemp1V1Result = stream
.map(new AnalyticActivityTemp1V1MapFunction());
CassandraSink.addSink(analyticActivityTemp1V1Result).setClusterBuilder(cb).build().name("AnalyticsActivityTemp1V1");
// Write to analyticActivityTemp2V1 table
DataStream<AnalyticsActivityTemp2V1> analyticActivityTemp2V1Result = stream
.map(new AnalyticActivityTemp2V1MapFunction());
CassandraSink.addSink(analyticActivityTemp2V1Result).setClusterBuilder(cb).build().name("AnalyticsActivityTemp2V1");
// Write to analyticActivityTemp1V1 table
DataStream<AnalyticsReportTemp1V1> AnalyticsReportTemp1V1Result = stream
.map(new AnalyticsReportTemp1V1MapFunction());
CassandraSink.addSink(AnalyticsReportTemp1V1Result).setClusterBuilder(cb).build().name("AnalyticsReportTemp1V1");
// Write to analyticActivityTemp2V1 table
DataStream<AnalyticsReportTemp2V1> AnalyticsReportTemp2V1Result = stream
.map(new AnalyticsReportTemp2V1MapFunction());
CassandraSink.addSink(AnalyticsReportTemp2V1Result).setClusterBuilder(cb).build().name("AnalyticsReportTemp2V1");
env.execute();