Добавление пользовательской информации перед отправкой сообщения в DLQ в Spring Cloud Streams - PullRequest
0 голосов
/ 27 февраля 2020

Я использую Spring Cloud Streams и механизм Spring Retry по умолчанию только с использованием свойств. Это работает хорошо, сообщения повторяются, затем go в DLQ ... Пока все отлично. Теперь возникает вопрос ...

Мне нужно добавить некоторую пользовательскую информацию в сообщение, прежде чем оно отправится из моей службы в DLQ. Они достаточно просты и помогли бы мне идентифицировать ошибочные сообщения, не затрагивая полезную нагрузку c.

Возможно, я мог бы добавить собственные заголовки или обернуть их в известную модель, где я мог бы получить необходимую информацию - в любом случае мне нужно было бы перехватить / изменить сообщение.

Что такое Самый простой способ сделать это, без особых затрат? Я имею в виду, что мы используем простую конфигурацию для повторных попыток, поэтому под «стоимостью» я подразумеваю замену конфигурации чем-то другим. В любом случае, спасибо!

1 Ответ

0 голосов
/ 27 февраля 2020

С помощью подшивки Kafka вы можете добавить ProducerInterceptor в конфигурацию производителя kafka interceptor.classes.

/**
 * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
 * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
 * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
 * <p>
 * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
 * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
 * not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
 * same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
 * as expected.
 * <p>
 * Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
 * Most often, it should be the same topic/partition from 'record'.
 * <p>
 * Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
 * <p>
 * Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
 * specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
 * in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
 * previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
 * the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
 * of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
 * modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
 * is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
 * or otherwise the client.
 *
 * @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
 * @return producer record to send to topic/partition
 */
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

Запись производителя содержит имя topi c назначения; Вы можете добавлять / удалять заголовки там.

В настоящее время нет аналогичного хука для подшивки RabbitMQ. Если вы используете эту подшивку, откройте новый выпуск функций на GitHub для подшивки.

...