Spring Cloud Sleuth с Reactor Kafka - PullRequest
1 голос
/ 06 мая 2020

Я использую Reactor Kafka в приложении Spring Boot Reactive с Spring Cloud Sleuth для распределенной трассировки. Я настроил Sleuth для использования специального ключа распространения из заголовка с именем "traceId". Я также настроил формат журнала для печати заголовка в моих журналах, поэтому запрос типа

curl -H "traceId: 123456" -X POST http://localhost:8084/parallel

будет печатать 123456 в каждом журнале в любом месте ниже по потоку, начиная с контроллера.

Теперь я бы хотел, чтобы этот заголовок распространялся и через Kafka. Я понимаю, что Sleuth также имеет встроенные инструменты для Kafka, поэтому заголовок должен распространяться автоматически, однако я не могу заставить это работать.

С моего контроллера я отправляю сообщение на Kafka topi c, а затем попросите другого потребителя Kafka забрать его для обработки.

Вот мой контроллер:

@RestController
@RequestMapping("/parallel")
public class BasicController {

    private Logger logger = Loggers.getLogger(BasicController.class);

    KafkaProducerLoadGenerator generator = new KafkaProducerLoadGenerator();    

    @PostMapping
    public Mono<ResponseEntity> createMessage() {
        int data = (int)(Math.random()*100000);
        return Flux.just(data)
                .doOnNext(num -> logger.info("Generating document for {}", num))
                .map(generator::generateDocument)
                .flatMap(generator::sendMessage)
                .doOnNext(result ->
                        logger.info("Sent message {}, offset is {} to partition {}",
                                result.getT2().correlationMetadata(),
                                result.getT2().recordMetadata().offset(),
                                result.getT2().recordMetadata().partition()))
                .doOnError(error -> logger.error("Error in subscribe while sending message", error))
                .single()
                .map(tuple -> ResponseEntity.status(HttpStatus.OK).body(tuple.getT1()));

    }
}

Вот код, который отправляет сообщения в Kafka topi c

@Component
public class KafkaProducerLoadGenerator {

    private static final Logger logger = Loggers.getLogger(KafkaProducerLoadGenerator.class);

    private static final String bootstrapServers = "localhost:9092";
    private static final String TOPIC = "load-topic";

    private KafkaSender<Integer, String> sender;

    private static int documentIndex = 0;

    public KafkaProducerLoadGenerator() {
        this(bootstrapServers);
    }

    public KafkaProducerLoadGenerator(String bootstrapServers) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "load-generator");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<Integer, String> senderOptions = SenderOptions.create(props);
        sender = KafkaSender.create(senderOptions);
    }

    @NewSpan("generator.sendMessage")
    public Flux<Tuple2<DataDocument, SenderResult<Integer>>> sendMessage(DataDocument document) {
        return sendMessage(TOPIC, document)
                .map(result -> Tuples.of(document, result));
    }

    public Flux<SenderResult<Integer>> sendMessage(String topic, DataDocument document) {
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topic, document.getData(), document.toString());
        return sender.send(Mono.just(SenderRecord.create(producerRecord, document.getData())))
                .doOnNext(record -> logger.info("Sent message to partition={}, offset={} ", record.recordMetadata().partition(), record.recordMetadata().offset()))
                .doOnError(e -> logger.error("Error sending message " + documentIndex, e));
    }

    public DataDocument generateDocument(int data) {
        return DataDocument.builder()
                .header("Load Data")
                .data(data)
                .traceId("trace"+data)
                .timestamp(Instant.now())
                .build();
    }
}

Мой потребитель выглядит так:

@Component
@Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class IndividualConsumer {

    private static final Logger logger = Loggers.getLogger(IndividualConsumer.class);

    private static final String bootstrapServers = "localhost:9092";
    private static final String TOPIC = "load-topic";

    private int consumerIndex = 0;

    public ReceiverOptions setupConfig(String bootstrapServers) {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "load-topic-consumer-"+consumerIndex);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "load-topic-multi-consumer-2");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DataDocumentDeserializer.class);
        return ReceiverOptions.create(properties);
    }

    public void setIndex(int i) {
        consumerIndex = i;
    }

    @EventListener(ApplicationReadyEvent.class)
    public Disposable consumeMessage() {
        ReceiverOptions<Integer, DataDocument> receiverOptions = setupConfig(bootstrapServers)
                .subscription(Collections.singleton(TOPIC))
                .addAssignListener(receiverPartitions -> logger.debug("onPartitionsAssigned {}", receiverPartitions))
                .addRevokeListener(receiverPartitions -> logger.debug("onPartitionsRevoked {}", receiverPartitions));

        Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
            KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(receiverOptions);
            return receiver.receive();
        });
        Consumer<? super ReceiverRecord<Integer, DataDocument>> acknowledgeOffset = record -> record.receiverOffset().acknowledge();
        return messages
                .publishOn(Schedulers.newSingle("Parallel-Consumer"))
                .doOnError(error -> logger.error("Error in the reactive chain", error))
                .delayElements(Duration.ofMillis(100))
                .doOnNext(record -> {
                    logger.info("Consumer {}: Received from partition {}, offset {}, data with index {}",
                            consumerIndex,
                            record.receiverOffset().topicPartition(),
                            record.receiverOffset().offset(),
                            record.value().getData());
                })
                .doOnNext(acknowledgeOffset)
                .doOnError(error -> logger.error("Error receiving record", error))
                .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
                .subscribe();
    }
}

Я ожидал бы, что Sleuth автоматически перенесет встроенную трассировку Brave и пользовательские заголовки к потребителю, так что трассировка охватывает всю транзакцию.

Однако у меня две проблемы.

  1. Bean-компонент-генератор не получает такой же трассировки, как и в контроллере. Он использует другую (и новую) трассировку для каждого отправленного сообщения.
  2. Трассировка не передается от производителя Kafka потребителю Kafka.

Я могу решить # 1 выше, заменив bean-компонент генератора с простым классом Java и создание его экземпляра в контроллере. Однако это означает, что я не могу автоматически подключать другие зависимости, и в любом случае это не решает # 2.

Я могу загрузить экземпляр bean-компонента brave.kafka.clients.KafkaTracing, поэтому я знаю, что он загружается Spring . Однако не похоже, что приборы работают. Я проверил содержимое Kafka с помощью Kafka Tool, и ни в одном сообщении не было заголовков. На самом деле у потребителя вообще нет следов.

2020-05-06 23:57:32.898     INFO  parallel-consumer:local [123-21922,578c510e23567aec,578c510e23567aec] 8180 --- [reactor-http-nio-3] rja.parallelconsumers.BasicController    : Generating document for 23965
2020-05-06 23:57:32.907     INFO  parallel-consumer:local [52e02d36b59c5acd,52e02d36b59c5acd,52e02d36b59c5acd] 8180 --- [single-11] r.p.kafka.KafkaProducerLoadGenerator     : Sent message to partition=17, offset=0 
2020-05-06 23:57:32.908     INFO  parallel-consumer:local [123-21922,578c510e23567aec,578c510e23567aec] 8180 --- [single-11] rja.parallelconsumers.BasicController    : Sent message 23965, offset is 0 to partition 17
2020-05-06 23:57:33.012     INFO  parallel-consumer:local [-,-,-] 8180 --- [parallel-5] r.parallelconsumers.IndividualConsumer   : Consumer 8: Received from partition load-topic-17, offset 0, data with index 23965

В приведенном выше журнале [123-21922,578c510e23567aec,578c510e23567aec] это [custom-trace-header, brave traceId, brave spanId]

Что мне не хватает?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...