Поток Kafka groupByKey не работает для count () - PullRequest
0 голосов
/ 23 марта 2019

Я пытаюсь сгенерировать счет на основе ключей, используя приведенный ниже код, этот код основан на примере подсчета слов. Странно, если функция mapValues ​​возвращает String, тогда groupBy работает, как указано в закомментированной строке, но когда я отправляю пару ключей String в качестве ключа и GenericRecord в качестве значения.

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url","http://localhost:8081");

stringSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
                valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values

StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> textLines =
                  builder.stream("ora-query-in",Consumed.with(stringSerde, valueGenericAvroSerde));


final KTable<String, Long> wordCounts = textLines       
                        .mapValues(new ValueMapperWithKey<String, GenericRecord, KeyValue<String, GenericRecord>>() {

                                    @Override
                                    public KeyValue<String, GenericRecord> apply(String arg0, GenericRecord arg1) {

                                        return new KeyValue<String, GenericRecord>(arg1.get("KEY_FIELD").toString(),arg1);
                                        }
                                    })

            //                      .groupBy((key, value) -> value) //THIS WORKS if value is STRING
            //                      .groupBy((key, value) -> key) //DOES NOT WORK EITHER
                                    .groupByKey() //THIS does nothing
                                    .count();
wordCounts.toStream().to("test.topic.out",Produced.with(stringSerde, longSerde));

Я что-то упустил в конфигурации

streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

1 Ответ

0 голосов
/ 24 марта 2019

Вы еще не написали, что именно не так, но кажется, что это проблема с Serialization

Вы можете использовать:

  • KStream::groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, final Grouped<KR, V> grouped).

someStream.groupByKey((key, value) -> value, Grouped.with(newKeySerdes, valueSerdes)

  • KGroupedStream::count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized)

someGroupedStream.count(Materialized.with(newKeySerdes, valueSerdes)

Может быть такой же причиной, как:

...