Итак, я создаю конвейер CD C, используя kafka connect, потоки kafka и эластичный поиск . Я довольно новичок во всех трех технологиях. Поэтому я использую Kafka connect для получения данных из базы данных mysql и создания тем на основе типа элемента (это прекрасно работает). Это для 2 предметов, но в итоге у меня будет несколько типов предметов.
Например. Созданные темы Kafka
- test.catalog.catalog_air_cooler
- test.catalog.catalog_air_cooler_eav
- test.catalog.catalog_air_purifier
- test.catalog.catier_airepu .
Моя конечная цель - построить сводное представление всех тем и сохранить их как индекс в поиске elasti c. Я использую kafka streams .
private void constructReport() {
Properties props = kafkaConfig.populateKafkConfigMap("DwCatrgoryReport");
List<String> itemTypes = new ArrayList<String>();
// the item types for which topics are created
itemTypes.add("air_cooler");
itemTypes.add("air_purifier");
for (int i = 0; i < itemTypes.size(); i++) {
String itemType = itemTypes.get(i);
// I am dynamically constructing topic names.
String topicName = "test.catalog.catalog_"+itemType;
StreamsBuilder stremBuilder = new StreamsBuilder();
//am constructing different streams for different topics
KTable<String, String> catalogStream = stremBuilder.table(topicName);
KTable<String, String> catalogEavStream = stremBuilder.table(topicName+"_eav");
//changing the key of table for join.(omitting code)
//joining catalog and catlogeav table
KStream<String, String> stream = catalogStream.join(catalogEavStream, (leftValue, rightValue) -> kafkaStreamsListenerOperationsHelperService
.createAggregateStream(leftValue, rightValue, ReqType.CATALOG)).toStream();
callEsAndCreateIndex(stream, ElasticSearchTopics.DW_CATEGORY_GROUP_TOPIC);
final KafkaStreams streams = new KafkaStreams(stremBuilder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
В приведенном выше коде я создаю поток, объединяю их и сохраняю их вasticsearch.
Так что происходит, индекс создается для агрегации air_cooler (обе таблицы), но не для air_purifier в поиске elasti c. Я получаю следующее исключение.
java.lang.IllegalStateException: Consumer was assigned partitions [ktm.catalog.catalog_air_cooler_eav-0] which didn't correspond to subscription request [ktm.catalog.catalog_air_purifier, ktm.catalog.catalog_air_purifier_eav, DwCatrgoryReport-KTABLE-AGGREGATE-STATE-STORE-0000000012-repartition, DwCatrgoryReport-KTABLE-AGGREGATE-STATE-STORE-0000000007-repartition]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handleAssignmentMismatch(ConsumerCoordinator.java:218)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:859)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Потребуется помощь или направление с точки зрения того, где и как возможно устранить это исключение. Заранее спасибо.