Пакетная запись в несколько таблиц Cassandra от Apache Flink - PullRequest
0 голосов
/ 26 марта 2019

У нас есть требование извлекать записи из темы kafka, сопоставлять ее с 5 различными типами POJO и записывать каждый тип POJO в таблицу Cassandra. Я использовал Кассандру Синк, чтобы сделать это.

Есть ли способ, которым я могу сделать это как пакет? Вместо того, чтобы использовать 5 разных приемников для кассандры, я хочу использовать один приемник и одну операцию записи с использованием пакетной записи.

Я посмотрел в CassandraOutputFormat, но я думаю, что для сохранения в одной таблице с несколькими кортежами. Это верно? Можем ли мы использовать сам POJO вместо Tuples?

Пожалуйста, дайте мне знать, если поддерживается пакетная запись в таблицы cassandra.

    // create execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    properties.setProperty("zookeeper.connect", "localhost:2181"); // Zookeeper default host:port
    properties.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
    properties.setProperty("group.id", "cassandra_events"); // Consumer group ID
    properties.setProperty("auto.offset.reset", "earliest"); // Always read topic
    // from start

    DataStream<ParsedPayload> stream = env.addSource(
            new FlinkKafkaConsumer010<>("cassandra_events", new ParsedPayloadSchema<ParsedPayload>(), properties));

    ClusterBuilder cb = new CassandraClusterBuilder();

    // Write to AnalyticsActivityEventInfo table
    DataStream<AnalyticsActivityEventInfo> analyticsActivityEventInfoResult = stream
            .map(new AnalyticsActivityEventInfoMapFunction());
    CassandraSink.addSink(analyticsActivityEventInfoResult).setClusterBuilder(cb).build().name("AnalyticsActivityEventInfo");

    // Write to analyticActivityTemp1V1 table
    DataStream<AnalyticsActivityTemp1V1> analyticActivityTemp1V1Result = stream
            .map(new AnalyticActivityTemp1V1MapFunction());
    CassandraSink.addSink(analyticActivityTemp1V1Result).setClusterBuilder(cb).build().name("AnalyticsActivityTemp1V1");

    // Write to analyticActivityTemp2V1 table
    DataStream<AnalyticsActivityTemp2V1> analyticActivityTemp2V1Result = stream
            .map(new AnalyticActivityTemp2V1MapFunction());
    CassandraSink.addSink(analyticActivityTemp2V1Result).setClusterBuilder(cb).build().name("AnalyticsActivityTemp2V1");

    // Write to analyticActivityTemp1V1 table
    DataStream<AnalyticsReportTemp1V1> AnalyticsReportTemp1V1Result = stream
            .map(new AnalyticsReportTemp1V1MapFunction());
    CassandraSink.addSink(AnalyticsReportTemp1V1Result).setClusterBuilder(cb).build().name("AnalyticsReportTemp1V1");

    // Write to analyticActivityTemp2V1 table
    DataStream<AnalyticsReportTemp2V1> AnalyticsReportTemp2V1Result = stream
            .map(new AnalyticsReportTemp2V1MapFunction());
    CassandraSink.addSink(AnalyticsReportTemp2V1Result).setClusterBuilder(cb).build().name("AnalyticsReportTemp2V1");

    env.execute();
...