Kafka Streams с одним разделом, чтобы сделать паузу при ошибке - PullRequest
0 голосов
/ 06 февраля 2020

У меня есть один брокер Kafka с одним разделом. Требовалось сделать следующее:

  1. Читать из этого раздела
  2. Преобразовать сообщение, вызвав REST API
  3. Publi sh преобразованное сообщение в другой REST API
  4. Pu sh ответное сообщение на другую топи c

Я использую Kafka Streams для достижения этой цели с помощью следующего кода

StreamsBuilder builder = new StreamsBuilder();`
KStream<Object, Object> consumerStream = builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic(), Produced.with(lStringKeySerde, lAvroValueSerde));
return builder.build();

FOllowing моя конфигурация:

        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", bootstrapServers));
        if (schemaRegistry != null && schemaRegistry.length > 0) {
            streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));          
        }
        streamsConfig.put(this.keySerializerKeyName, keyStringSerializerClassName);
        streamsConfig.put(this.valueSerialzerKeyName, valueAVROSerializerClassName);
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
        streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
        streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
        streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
        streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
        streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
        streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

Я искал механизм для выполнения следующих действий в моем KeyValueMapper:

  1. Если какой-либо из REST API не работает, то я ловлю исключение
  2. Я бы хотел, чтобы одно и то же смещение сохранялось в цикле до тех пор, пока система не будет восстановлена, ИЛИ не приостановил потребление до тех пор, пока система не будет восстановлена ​​

Я проверил следующие ссылки, но они не похоже на помощь.

Как эффективно запустить потоки kafka с одним экземпляром приложения и с одним разделом topi c?

Следующая ссылка говорит о KafkaTransactionManager, но это будет не ж ork Полагаю, способ инициализации KStream выше

Транзакция Kafka завершилась неудачно, но в любом случае фиксирует смещение

Любая помощь / указатели в этом направлении приветствуются.

1 Ответ

0 голосов
/ 10 февраля 2020

То, что вы хотите сделать, на самом деле не поддерживается. Приостановка потребителя невозможна в Kafka Streams.

Вы можете «остановить» только обработку, если вы oop удерживаете KeyValueMapper, однако в этом случае потребитель может выпасть из потребителя. группа. В вашем случае, с одним входным разделом topi c и в любом случае может иметь только один поток в одном экземпляре KafkaStreams, следовательно, это не повлияет на других членов группы (так как их нет). Однако проблема заключается в том, что фиксация смещения завершится неудачей после того, как поток выпадет из группы. Следовательно, после того, как поток повторно присоединится к группе, он извлечет более старое смещение и обработает некоторые данные (ie, вы получаете обработку дублированных данных). Чтобы избежать исключения из группы потребителей, вы можете установить для max.poll.interval.ms config высокое значение (возможно, даже Integer.MAX_VALUE) - учитывая, что у вас есть один член в группе потребителей, установка высокого значения должна быть в порядке.

Другой альтернативой может быть использование transform() с хранилищем состояний. Если вы не можете сделать вызовы REST, вы помещаете данные в хранилище и повторите попытку позже. Таким образом, потребитель не выпадет из группы. Однако чтение новых данных никогда не прекратится, и вам потребуется буферизовать все данные в хранилище, пока API REST не будет вызван снова. Вы должны быть в состоянии замедлить чтение новых данных (чтобы уменьшить объем данных, которые нужно буферизовать), «спя» в вашем Transformer - вам просто нужно убедиться, что вы не нарушаете max.poll.interval.ms config (по умолчанию) 30 секунд).

...