С помощью подшивки Kafka вы можете добавить ProducerInterceptor
в конфигурацию производителя kafka interceptor.classes
.
/**
* This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
* get serialized and partition is assigned (if partition is not specified in ProducerRecord).
* <p>
* This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
* key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
* not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
* same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
* as expected.
* <p>
* Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
* Most often, it should be the same topic/partition from 'record'.
* <p>
* Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
* <p>
* Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
* specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
* in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
* previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
* the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
* of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
* modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
* is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
* or otherwise the client.
*
* @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
* @return producer record to send to topic/partition
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
Запись производителя содержит имя topi c назначения; Вы можете добавлять / удалять заголовки там.
В настоящее время нет аналогичного хука для подшивки RabbitMQ. Если вы используете эту подшивку, откройте новый выпуск функций на GitHub для подшивки.