У меня есть один брокер Kafka с одним разделом. Требовалось сделать следующее:
- Читать из этого раздела
- Преобразовать сообщение, вызвав REST API
- Publi sh преобразованное сообщение в другой REST API
- Pu sh ответное сообщение на другую топи c
Я использую Kafka Streams для достижения этой цели с помощью следующего кода
StreamsBuilder builder = new StreamsBuilder();`
KStream<Object, Object> consumerStream = builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic(), Produced.with(lStringKeySerde, lAvroValueSerde));
return builder.build();
FOllowing моя конфигурация:
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", bootstrapServers));
if (schemaRegistry != null && schemaRegistry.length > 0) {
streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));
}
streamsConfig.put(this.keySerializerKeyName, keyStringSerializerClassName);
streamsConfig.put(this.valueSerialzerKeyName, valueAVROSerializerClassName);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
Я искал механизм для выполнения следующих действий в моем KeyValueMapper:
- Если какой-либо из REST API не работает, то я ловлю исключение
- Я бы хотел, чтобы одно и то же смещение сохранялось в цикле до тех пор, пока система не будет восстановлена, ИЛИ не приостановил потребление до тех пор, пока система не будет восстановлена
Я проверил следующие ссылки, но они не похоже на помощь.
Как эффективно запустить потоки kafka с одним экземпляром приложения и с одним разделом topi c?
Следующая ссылка говорит о KafkaTransactionManager, но это будет не ж ork Полагаю, способ инициализации KStream выше
Транзакция Kafka завершилась неудачно, но в любом случае фиксирует смещение
Любая помощь / указатели в этом направлении приветствуются.