Я создаю приложение для электронной коммерции, в котором в настоящее время я работаю с двумя потоками данных: выполнением заказов и прерыванием продаж. Сломанная продажа была бы неправильным исполнением по ряду причин. Сломанная продажа будет иметь тот же номер ссылки заказа, что и заказ, поэтому объединение выполняется по номеру заказа и позиции #.
В настоящее время у меня есть две темы - orders
и broken
. Оба были определены с использованием Avro Schemas и построены с использованием SpecificRecord. Ключ OrderReferenceNumber
.
Поля для orders
: OrderReferenceNumber, Timestamp, OrderLine, ItemNumber, Quantity
Поля для broken
: OrderReferenceNumber, OrderLine, Timestamp
Соответствующие классы Java были сгенерированы с помощью команды
mvn clean package
Мне нужно присоединиться слева orders
с broken
и включить в вывод следующие поля: OrderReferenceNumber, Timestamp, BrokenSaleTimestamp, OrderLine, ItemNumber, Quantity
Вот мой код:
public static void main(String[] args) {
// Declare variables
final Map<String, String> avroSerdeConfig = Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
// Add Kafka Streams Properties
Properties streamsProperties = new Properties();
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "orderProcessor");
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsProperties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost:8081");
// Specify Kafka Topic Names
String orderTopic = "com.ecomapp.input.OrderExecuted";
String brokenTopic = "com.ecomapp.input.BrokenSale";
// Specify Serializer-Deserializer or Serdes for each Message Type
Serdes.StringSerde stringSerde = new Serdes.StringSerde();
Serdes.LongSerde longSerde = new Serdes.LongSerde();
// For the Order Executed Message
SpecificAvroSerde<OrderExecuted> ordersSpecificAvroSerde = new SpecificAvroSerde<OrderExecuted>();
ordersSpecificAvroSerde.configure(avroSerdeConfig, false);
// For the Broken Sale Message
SpecificAvroSerde<BrokenSale> brokenSpecificAvroSerde = new SpecificAvroSerde<BrokenSale>();
brokenSpecificAvroSerde.configure(avroSerdeConfig, false);
StreamsBuilder streamBuilder = new StreamsBuilder();
KStream<String, OrderExecuted> orders = streamBuilder
.stream(orderTopic, Consumed.with(stringSerde, ordersSpecificAvroSerde))
.selectKey((key, orderExec) -> orderExec.getMatchNumber().toString());
KStream<String, BrokenSale> broken = streamBuilder
.stream(brokenTopic, Consumed.with(stringSerde, brokenSpecificAvroSerde))
.selectKey((key, brokenS) -> brokenS.getMatchNumber().toString());
KStream<String, JoinOrdersExecutedNonBroken> joinOrdersNonBroken = orders
.leftJoin(broken,
(orderExec, brokenS) -> JoinOrdersExecutedNonBroken.newBuilder()
.setOrderReferenceNumber((Long) orderExec.get("OrderReferenceNumber"))
.setTimestamp((Long) orderExec.get("Timestamp"))
.setBrokenSaleTimestamp((Long) brokenS.get("Timestamp"))
.setOrderLine((Long) orderExec.get("OrderLine"))
.setItemNumber((String) orderExec.get("ItemNumber"))
.setQuantity((Long) orderExec.get("Quantity"))
.build(),
JoinWindows.of(TimeUnit.MILLISECONDS.toMillis(1))
Joined.with(stringSerde, ordersSpecificAvroSerde, brokenSpecificAvroSerde))
.peek((key, value) -> System.out.println("key = " + key + ", value = " + value));
KafkaStreams orderStreams = new KafkaStreams(streamBuilder.build(), streamsProperties);
orderStreams.start();
// print the topology
System.out.println(orderStreams.localThreadsMetadata());
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(orderStreams::close));
}
Когда я запускаю это, я получаю следующую ошибку maven:
[ERROR] /Tech/Projects/jCom/src/main/java/com/ecomapp/kafka/orderProcessor.java:[96,26] incompatible types: cannot infer type-variable(s) VO,VR,K,V,VO
(argument mismatch; org.apache.kafka.streams.kstream.Joined<K,V,com.ecomapp.input.BrokenSale> cannot be converted to org.apache.kafka.streams.kstream.Joined<java.lang.String,com.ecomapp.OrderExecuted,com.ecomapp.input.BrokenSale>)
Проблема действительно в определении моего ValueJoiner
. В документации Confluent не очень ясно, как это сделать, когда задействованы схемы Avro (я также не могу найти примеры). Как правильно определить это?