Это получено из примеров Confluent GlobalKTables с двумя объединениями между GlobalKTables и KStreams. Обратите внимание, что первый поток проходит через стандартный «вход» ...
@Component
public class TableStreamListener {
// private final StreamsBuilder builder = new StreamsBuilder();
@EnableBinding(DataGen.class)
public class DataAnalyticsProcessorApplication {
/**
* DevNotes: compilation fails unless method returns a KStream
*
* @param ordersStream
* @param customers
* @param products
* @return
*/
@StreamListener
@SendTo("output")
public KStream<Object, EnrichedOrder> process(@Input("input") KStream<Object, Order> ordersStream,
@Input("customers") GlobalKTable<Object, Customer> customers,
@Input("products") GlobalKTable<Object, Product> products) {
// Join the orders stream to the customer global table. As this is global table
// we can use a non-key based join with out needing to repartition the input
// stream
KStream<Object, CustomerOrder> customerOrdersStream = ordersStream
// .peek((key, value) -> System.out.println("ordersStream -- key: " + key + " --
// value: " + value))
.join(customers, (key, value) -> value.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
// Join the enriched customer order stream with the product global table. As
// this is global table
// we can use a non-key based join without needing to repartition the input
// stream
KStream<Object, EnrichedOrder> enrichedOrdersStream = customerOrdersStream
// .peek((key, value) -> System.out.println("customerOrdersStream2 -- key: " +
// key + " -- value: " + value.toString()))
.join(products, (key, value) -> value.getOrder().getProductId(),
(customerOrder, product) -> new EnrichedOrder(product, customerOrder.getCustomer(),
customerOrder.getOrder()));
return enrichedOrdersStream;
}
}
interface DataGen extends KafkaStreamsProcessor {
@Input("customers")
GlobalKTable<?, ?> customers();
@Input("products")
GlobalKTable<?, ?> products();
}
}