Как написать ValueJoiner при объединении двух потоков Kafka, определенных с помощью схем Avro? - PullRequest
0 голосов
/ 18 мая 2018

Я создаю приложение для электронной коммерции, в котором в настоящее время я работаю с двумя потоками данных: выполнением заказов и прерыванием продаж. Сломанная продажа была бы неправильным исполнением по ряду причин. Сломанная продажа будет иметь тот же номер ссылки заказа, что и заказ, поэтому объединение выполняется по номеру заказа и позиции #.

В настоящее время у меня есть две темы - orders и broken. Оба были определены с использованием Avro Schemas и построены с использованием SpecificRecord. Ключ OrderReferenceNumber.

Поля для orders: OrderReferenceNumber, Timestamp, OrderLine, ItemNumber, Quantity

Поля для broken: OrderReferenceNumber, OrderLine, Timestamp

Соответствующие классы Java были сгенерированы с помощью команды

mvn clean package

Мне нужно присоединиться слева orders с broken и включить в вывод следующие поля: OrderReferenceNumber, Timestamp, BrokenSaleTimestamp, OrderLine, ItemNumber, Quantity

Вот мой код:

public static void main(String[] args) {
    // Declare variables
    final Map<String, String> avroSerdeConfig = Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

    // Add Kafka Streams Properties
    Properties streamsProperties = new Properties();
    streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "orderProcessor");
    streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
    streamsProperties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost:8081");

    // Specify Kafka Topic Names
    String orderTopic = "com.ecomapp.input.OrderExecuted";
    String brokenTopic = "com.ecomapp.input.BrokenSale";

    // Specify Serializer-Deserializer or Serdes for each Message Type
    Serdes.StringSerde stringSerde = new Serdes.StringSerde();
    Serdes.LongSerde longSerde = new Serdes.LongSerde();
    // For the Order Executed Message
    SpecificAvroSerde<OrderExecuted> ordersSpecificAvroSerde = new SpecificAvroSerde<OrderExecuted>();
    ordersSpecificAvroSerde.configure(avroSerdeConfig, false);
    // For the Broken Sale Message
    SpecificAvroSerde<BrokenSale> brokenSpecificAvroSerde = new SpecificAvroSerde<BrokenSale>();
    brokenSpecificAvroSerde.configure(avroSerdeConfig, false);


    StreamsBuilder streamBuilder = new StreamsBuilder();

    KStream<String, OrderExecuted> orders = streamBuilder
            .stream(orderTopic, Consumed.with(stringSerde, ordersSpecificAvroSerde))
            .selectKey((key, orderExec) -> orderExec.getMatchNumber().toString());
    KStream<String, BrokenSale> broken = streamBuilder
            .stream(brokenTopic, Consumed.with(stringSerde, brokenSpecificAvroSerde))
            .selectKey((key, brokenS) -> brokenS.getMatchNumber().toString());

    KStream<String, JoinOrdersExecutedNonBroken> joinOrdersNonBroken = orders
        .leftJoin(broken,
                (orderExec, brokenS) -> JoinOrdersExecutedNonBroken.newBuilder()
                        .setOrderReferenceNumber((Long) orderExec.get("OrderReferenceNumber"))
                        .setTimestamp((Long) orderExec.get("Timestamp"))
                        .setBrokenSaleTimestamp((Long) brokenS.get("Timestamp"))
                        .setOrderLine((Long) orderExec.get("OrderLine"))
                        .setItemNumber((String) orderExec.get("ItemNumber"))
                        .setQuantity((Long) orderExec.get("Quantity"))
                        .build(),
                JoinWindows.of(TimeUnit.MILLISECONDS.toMillis(1))
                Joined.with(stringSerde, ordersSpecificAvroSerde, brokenSpecificAvroSerde))
        .peek((key, value) -> System.out.println("key = " + key + ", value = " + value));


    KafkaStreams orderStreams = new KafkaStreams(streamBuilder.build(), streamsProperties);
    orderStreams.start();

    // print the topology
    System.out.println(orderStreams.localThreadsMetadata());

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(orderStreams::close));

}

Когда я запускаю это, я получаю следующую ошибку maven:

[ERROR] /Tech/Projects/jCom/src/main/java/com/ecomapp/kafka/orderProcessor.java:[96,26] incompatible types: cannot infer type-variable(s) VO,VR,K,V,VO
    (argument mismatch; org.apache.kafka.streams.kstream.Joined<K,V,com.ecomapp.input.BrokenSale> cannot be converted to org.apache.kafka.streams.kstream.Joined<java.lang.String,com.ecomapp.OrderExecuted,com.ecomapp.input.BrokenSale>)

Проблема действительно в определении моего ValueJoiner. В документации Confluent не очень ясно, как это сделать, когда задействованы схемы Avro (я также не могу найти примеры). Как правильно определить это?

1 Ответ

0 голосов
/ 18 мая 2018

Не знаю, почему Java не может разрешить тип.

Попробуйте:

Joined.<String,OrderExecuted,BrokenSale>with(stringSerde, ordersSpecificAvroSerde, brokenSpecificAvroSerde))

Чтобы явно указать типы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...