Spring Cloud Stream kafka не может принимать сообщения в двух модулях, но может работать в одном модуле - PullRequest
0 голосов
/ 20 апреля 2019

Я использую Spring Cloud Stream kafka для обмена сообщениями.Когда я помещаю потребителя и производителя в два модуля, производитель может создать сообщение для темы, но потребитель не может получить сообщение.Но он может работать в одном модуле, и код не меняется.

  • Версия Springboot: 2.1.3 RELEASE
  • Версия Spring Cloud: Greenwich.RELEASE
  • Кафка версия: 2.0.1

Мой pom.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

Производитель

spring:
  kafka:
    bootstrap-servers: localhost:9092
  cloud:
    stream:
      bindings:
        output:
          destination: org_change
          content-type: application/json

OrganizationServiceApplication.java

@SpringBootApplication
@RefreshScope
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Source.class)
public class OrganizationServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrganizationServiceApplication.class, args);
    }
}

SimpleSourceBean.java

@Component
@Slf4j
public class SimpleSourceBean {
    private final Source organizationStreams;

    @Autowired
    public SimpleSourceBean(Source organizationStreams) {
        this.organizationStreams = organizationStreams;
    }

    public void publishOrgChange(String action, String orgId) {
        log.debug("Send Kafka message {} for Organization Id: {}", action, orgId);
        // OrganizationChangeModel is a model(JOPO).
        OrganizationChangeModel model = new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                orgId,
                UserContext.CORRELATION_ID
        );

        organizationStreams
                .output()
                .send(MessageBuilder
                        .withPayload(model)
                        .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                        .build());
    }

Потребитель

config:

spring:
  kafka:
    bootstrap-servers: localhost:9092
  cloud:
    stream:
      bindings:
        input:
          destination: org_change
          content-type: application/json
          group: licensingGroup

LicensesServiceApplication.java


/**
 * @author ming
 */
@SpringBootApplication
@RefreshScope
@EnableDiscoveryClient
@EnableFeignClients("com.ming.licenses.client")
@EnableCircuitBreaker
@EnableBinding(Sink.class)
@Slf4j
public class LicensesServiceApplication {

    @Autowired
    private OAuth2ClientContext oAuth2ClientContext;

    public static void main(String[] args) {
        SpringApplication.run(LicensesServiceApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void loggerSink(OrganizationChangeModel changeModel) {
        log.debug("Received an event for organization id {}.", changeModel.getOrganizationId());
    }
}

...