Как установить ключ сообщения? - PullRequest
0 голосов
/ 20 июня 2019

Я использую Spring Cloud с Apache Kafka. Сообщения, отправляемые в тему кафки, обычно также содержат ключ. Как я могу установить этот ключ?

Допустим, я создаю такие сообщения

@StreamListener(CashflowSink.T1_CASHFLOW_IN)
@SendTo(CashflowSink.T2_CASHFLOW_OUT)
public synchronized Cashflow receive1(String message) {
    System.out.println("******************");
    System.out.println("At Sink1");
    System.out.println("******************");
    System.out.println("Received message " + message);

    String arr[] = message.split(";");
    if (arr[0].equalsIgnoreCase("Cashflow")) {
        Cashflow cf = new Cashflow();
        cf.setContractId(Integer.parseInt(arr[1]));
        cf.setDate(arr[2]);
        cf.setAmount(Float.parseFloat(arr[3]));
        return cf;
    }

return null;

}

Позже я хочу объединить денежные потоки с другими темами. Для соединений мне нужен ключ. Допустим, ключ - это идентификатор контракта денежного потока. Как я могу установить этот ключ в сообщении?

РЕДАКТИРОВАТЬ 1

Пытаясь включить код от Гэри, я придумал:

    //@Bean //--> leads to: Parameter 0 of method runner in tki.bigdata.steams.CashflowService required a single bean, but 10 were found:
    @Qualifier("t2_cashflow_in")  // is this a correct way to get a handle to the channel?
    public ApplicationRunner runner(MessageChannel output) {
        ((AbstractMessageChannel) output).addInterceptor(0, new ChannelInterceptor() {

            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                return MessageBuilder.fromMessage(message)
                        .setHeader(KafkaHeaders.MESSAGE_KEY, ((Cashflow) message.getPayload()).getContractId())
                        .build();
            }

        });
        return args -> {            
        };
    }

    @StreamListener(CashflowSink.T2_CASHFLOW_IN)
    public synchronized void receive2(Cashflow cashflow, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
        System.out.println("******************");
        System.out.println("At Sink2");
        System.out.println("******************");
        System.out.println("Received cashflow " + cashflow);
        System.out.println(cashflow + ", key:" + new String(key));
    }

Аннотация @Bean дала мне ошибку, возможно, из-за нескольких «каналов». Я следовал подсказке в сообщении об ошибке и использовал аннотацию @Channel. Не уверен, что это то, чего я хочу достичь.

При выполнении этого кода я получаю ошибки, подобные следующим. Итак, я полагаю, перехват не сработал?

2019-06-20 20:56:02,557 ERROR [Ljava.lang.String;@4195105b.container-0-C-1 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = tier2.cashflow, partition = 0, offset = 11, CreateTime = 1561056959426, serialized key size = -1, serialized value size = 63, headers = RecordHeaders(headers = [RecordHeader(key = deliveryAttempt, value = [49]), RecordHeader(key = contentType, value = [34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110, 34]), RecordHeader(key = spring_json_header_types, value = [123, 34, 100, 101, 108, 105, 118, 101, 114, 121, 65, 116, 116, 101, 109, 112, 116, 34, 58, 34, 106, 97, 118, 97, 46, 117, 116, 105, 108, 46, 99, 111, 110, 99, 117, 114, 114, 101, 110, 116, 46, 97, 116, 111, 109, 105, 99, 46, 65, 116, 111, 109, 105, 99, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@5884f4e2)
org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class [B]
    at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:105)
    at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:106)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)

Для более полезной информации выкладываю весь класс

@EnableBinding(CashflowService.CashflowSink.class)
public class CashflowService {

    @StreamListener(CashflowSink.T1_CASHFLOW_IN)
    @SendTo(CashflowSink.T2_CASHFLOW_OUT)
    public synchronized Cashflow receive1(String message) {
        System.out.println("******************");
        System.out.println("At Sink1");
        System.out.println("******************");
        System.out.println("Received message " + message);

        String arr[] = message.split(";");
        if (arr[0].equalsIgnoreCase("Cashflow")) {
            Cashflow cf = new Cashflow();
            cf.setContractId(Integer.parseInt(arr[1]));
            cf.setDate(arr[2]);
            cf.setAmount(Float.parseFloat(arr[3]));

            return cf;
        }

        return null;

    }

    // @Bean //--> leads to: Parameter 0 of method runner in
    // tki.bigdata.steams.CashflowService required a single bean, but 10 were
    // found:
    @Qualifier("t2_cashflow_in") // is this a correct way to get a handle to the
                                    // channel?
    public ApplicationRunner runner(MessageChannel output) {
        ((AbstractMessageChannel) output).addInterceptor(0, new ChannelInterceptor() {

            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                return MessageBuilder.fromMessage(message)
                        .setHeader(KafkaHeaders.MESSAGE_KEY, ((Cashflow) message.getPayload()).getContractId()).build();
            }

        });
        return args -> {
        };
    }

    @StreamListener(CashflowSink.T2_CASHFLOW_IN)
    public synchronized void receive2(Cashflow cashflow, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
        System.out.println("******************");
        System.out.println("At Sink2");
        System.out.println("******************");
        System.out.println("Received cashflow " + cashflow);
        System.out.println(cashflow + ", key:" + new String(key));
    }

    public interface CashflowSink {
        String T1_CASHFLOW_IN = "t1_cashflow_in";
        String T2_CASHFLOW_IN = "t2_cashflow_in";
        String T1_CASHFLOW_OUT = "t1_cashflow_out";
        String T2_CASHFLOW_OUT = "t2_cashflow_out";

        @Input(T1_CASHFLOW_IN)
        SubscribableChannel t1_cashflow_in();

        @Input(T2_CASHFLOW_IN)
        SubscribableChannel t2_cashflow_in();

        @Output(T1_CASHFLOW_OUT)
        SubscribableChannel t1_cashflow_out();

        @Output(T2_CASHFLOW_OUT)
        SubscribableChannel t2_cashflow_out();
    }
}

1 Ответ

0 голосов
/ 20 июня 2019

Существует свойство производителя расширения messageKeyExpression kafka;проблема в том, что выражение вычисляется после преобразования полезной нагрузки (если вы не используете nativeEncoding, в этом случае вы можете использовать это с payload.contractId).

Если вы не используете нативную кодировку, выМожно добавить перехватчик (до преобразования перехватчик) и выдвинуть ключ в заголовок KafkaHeaders.MESSAGE_KEY.

Вот пример:

@SpringBootApplication
@EnableBinding(Processor.class)
public class So56689257Application {

    public static void main(String[] args) {
        SpringApplication.run(So56689257Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        ((AbstractMessageChannel) output).addInterceptor(0, new ChannelInterceptor() {

            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                return MessageBuilder.fromMessage(message)
                        .setHeader(KafkaHeaders.MESSAGE_KEY, ((Foo) message.getPayload()).getContractId().getBytes())
                        .build();
            }

        });
        return args -> {
            output.send(MessageBuilder.withPayload(new Foo("someContractId")).build());
        };
    }

    @StreamListener(Processor.INPUT)
    public void listen(Foo foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
        System.out.println(foo + ", key:" + new String(key));
    }

    public static class Foo {

        private String contractId;

        public Foo() {
            super();
        }

        public Foo(String contractId) {
            this.contractId = contractId;
        }

        public String getContractId() {
            return this.contractId;
        }

        public void setContractId(String contractId) {
            this.contractId = contractId;
        }

        @Override
        public String toString() {
            return "Foo [contractId=" + this.contractId + "]";
        }

    }

}

и

Foo [contractId=someContractId], key:someContractId

Связыватель RabbitMQ уже имеет встроенный RabbitExpressionEvaluatingInterceptor (который оценивает выражения перед преобразованием полезной нагрузки), но в настоящее время в связывателе Kafka нет эквивалента, поэтому вы должны добавить свой собственный перехватчик.

Или, есливы используете json, вы можете использовать #jsonpath в выражении для извлечения ключа.

...