Какой-нибудь пример, доступный для создания GlobalKTable с использованием облачного потока Spring kafka-streams? - PullRequest
0 голосов
/ 27 января 2019

Пожалуйста, поделитесь, если есть пример создания потоков облака Кафки в Spring и объединения в несколько GlobalKTable.

1 Ответ

0 голосов
/ 30 января 2019

Это получено из примеров Confluent GlobalKTables с двумя объединениями между GlobalKTables и KStreams. Обратите внимание, что первый поток проходит через стандартный «вход» ...

@Component
public class TableStreamListener {

//    private final StreamsBuilder builder = new StreamsBuilder();

    @EnableBinding(DataGen.class)
    public class DataAnalyticsProcessorApplication {

        /**
         * DevNotes: compilation fails unless method returns a KStream
         *
         * @param ordersStream
         * @param customers
         * @param products
         * @return
         */
        @StreamListener
        @SendTo("output")
        public KStream<Object, EnrichedOrder> process(@Input("input") KStream<Object, Order> ordersStream,
                @Input("customers") GlobalKTable<Object, Customer> customers,
                @Input("products") GlobalKTable<Object, Product> products) {

            // Join the orders stream to the customer global table. As this is global table
            // we can use a non-key based join with out needing to repartition the input
            // stream
            KStream<Object, CustomerOrder> customerOrdersStream = ordersStream
                    // .peek((key, value) -> System.out.println("ordersStream -- key: " + key + " --
                    // value: " + value))
                    .join(customers, (key, value) -> value.getCustomerId(),
                            (order, customer) -> new CustomerOrder(customer, order));

            // Join the enriched customer order stream with the product global table. As
            // this is global table
            // we can use a non-key based join without needing to repartition the input
            // stream
            KStream<Object, EnrichedOrder> enrichedOrdersStream = customerOrdersStream
                    // .peek((key, value) -> System.out.println("customerOrdersStream2 -- key: " +
                    // key + " -- value: " + value.toString()))
                    .join(products, (key, value) -> value.getOrder().getProductId(),
                            (customerOrder, product) -> new EnrichedOrder(product, customerOrder.getCustomer(),
                                    customerOrder.getOrder()));

            return enrichedOrdersStream;
        }

    }

    interface DataGen extends KafkaStreamsProcessor {

        @Input("customers")
        GlobalKTable<?, ?> customers();

        @Input("products")
        GlobalKTable<?, ?> products();

    }
}
...