Как переслать Kafka заголовки агрегированных бирж? - PullRequest
0 голосов
/ 29 мая 2019

Я пытаюсь использовать верблюжий агрегатор перед KafkaProducer.В настоящее время кажется невозможным, что KafkaProducer преобразует каждый элемент типа Exchange в ProducerRecord.Кроме того, ProducerRecord устанавливает одинаковый заголовок для каждого сообщения (который является заголовком глобального агрегата Exchange).См. Camel 2.23 KafkaProducer:

 @Override
                public ProducerRecord next() {
                    // must convert each entry of the iterator into the value according to the serializer
                    Object next = msgList.next();
                    Object value = tryConvertToSerializedType(exchange, next, endpoint.getConfiguration().getSerializerClass());

                    if (hasPartitionKey && hasMessageKey) {
                        return new ProducerRecord(msgTopic, partitionKey, null, key, value, propagatedHeaders);
                    } else if (hasMessageKey) {
                        return new ProducerRecord(msgTopic, null, null, key, value, propagatedHeaders);
                    } else {
                        return new ProducerRecord(msgTopic, null, null, null, value, propagatedHeaders);
                    }
                }

Для моего сценария использования мне нужно было бы объединять обмены и пересылать каждый отдельный обмен с заголовком тела AND.Это означает, что в приведенном выше коде мне нужно что-то вроде value конвертирует next.getBody, а заголовок установлен на next.getHeaders.Есть ли возможность сделать это и НЕ писать собственный верблюжий компонент для адаптации текущего?

...