IlegalStateException при создании агрегатов для отдельных тем в потоках Кафки - PullRequest
0 голосов
/ 25 февраля 2020

Итак, я создаю конвейер CD C, используя kafka connect, потоки kafka и эластичный поиск . Я довольно новичок во всех трех технологиях. Поэтому я использую Kafka connect для получения данных из базы данных mysql и создания тем на основе типа элемента (это прекрасно работает). Это для 2 предметов, но в итоге у меня будет несколько типов предметов.

Например. Созданные темы Kafka

  • test.catalog.catalog_air_cooler
  • test.catalog.catalog_air_cooler_eav
  • test.catalog.catalog_air_purifier
  • test.catalog.catier_airepu .

Моя конечная цель - построить сводное представление всех тем и сохранить их как индекс в поиске elasti c. Я использую kafka streams .

private void constructReport() {
        Properties props = kafkaConfig.populateKafkConfigMap("DwCatrgoryReport");
        List<String> itemTypes = new ArrayList<String>();
// the item types for which topics are created
        itemTypes.add("air_cooler");
        itemTypes.add("air_purifier");
        for (int i = 0; i < itemTypes.size(); i++) {
            String itemType = itemTypes.get(i);
            // I am dynamically constructing topic names.
            String topicName = "test.catalog.catalog_"+itemType;
            StreamsBuilder stremBuilder = new StreamsBuilder();
           //am constructing different streams for different topics
            KTable<String, String> catalogStream = stremBuilder.table(topicName);
            KTable<String, String> catalogEavStream = stremBuilder.table(topicName+"_eav");

           //changing the key of table for join.(omitting code)

            //joining catalog and catlogeav table
            KStream<String, String> stream = catalogStream.join(catalogEavStream, (leftValue, rightValue) -> kafkaStreamsListenerOperationsHelperService
                    .createAggregateStream(leftValue, rightValue, ReqType.CATALOG)).toStream();
            callEsAndCreateIndex(stream, ElasticSearchTopics.DW_CATEGORY_GROUP_TOPIC);
            final KafkaStreams streams = new KafkaStreams(stremBuilder.build(), props);
                streams.start();
                Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }

В приведенном выше коде я создаю поток, объединяю их и сохраняю их вasticsearch.

Так что происходит, индекс создается для агрегации air_cooler (обе таблицы), но не для air_purifier в поиске elasti c. Я получаю следующее исключение.

java.lang.IllegalStateException: Consumer was assigned partitions [ktm.catalog.catalog_air_cooler_eav-0] which didn't correspond to subscription request [ktm.catalog.catalog_air_purifier, ktm.catalog.catalog_air_purifier_eav, DwCatrgoryReport-KTABLE-AGGREGATE-STATE-STORE-0000000012-repartition, DwCatrgoryReport-KTABLE-AGGREGATE-STATE-STORE-0000000007-repartition]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handleAssignmentMismatch(ConsumerCoordinator.java:218)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:859)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

Потребуется помощь или направление с точки зрения того, где и как возможно устранить это исключение. Заранее спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...