Spring boot app для интеграции кафки с активным mq - PullRequest
0 голосов
/ 07 апреля 2020

Я пытаюсь создать приложение с загрузочной пружиной, которое читает сообщения от kafka и помещает их в activeMQ и наоборот (чтение из activeMQ и запись в kafka). Я не нашел никакого полезного учебника для быстрого запуска моего проекта

1 Ответ

1 голос
/ 07 апреля 2020

См. Spring Integration и расширение Spring Integration для Apache Kafka .

Использование адаптеров входящего и исходящего канала

jms -> kafka

kafka -> jms

Kafka Connect также имеет некоторые возможности в этом пространстве, но я не знаком с ним.

EDIT

Это простое приложение Spring Boot показывает передачу данных из Kafka в RabbitMQ и наоборот:

package com.example.demo;

import org.apache.kafka.clients.admin.NewTopic;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class So61069735Application {

    public static void main(String[] args) {
        SpringApplication.run(So61069735Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public ApplicationRunner toKafka() {
        return args -> this.kafkaTemplate.send("so61069735-1", "foo");
    }

    @KafkaListener(id = "so61069735-1", topics = "so61069735-1")
    public void listen1(String in) {
        System.out.println("From Kafka: " + in);
        this.rabbitTemplate.convertAndSend("so61069735-2", in.toUpperCase());
    }

    @RabbitListener(queues = "so61069735-2")
    public void listen2(String in) {
        System.out.println("From Rabbit: " + in);
        this.kafkaTemplate.send("so61069735-3", in + in);
    }

    @KafkaListener(id = "so61069735-3", topics = "so61069735-3")
    public void listen(String in) {
        System.out.println("Final: " + in);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so61069735-1").partitions(1).replicas(1).build();
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable("so61069735-2").build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so61069735-3").partitions(1).replicas(1).build();
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

Результат

From Kafka: foo
From Rabbit: FOO
Final: FOOFOO
...