Я использую Spring Cloud Stream kafka для обмена сообщениями.Когда я помещаю потребителя и производителя в два модуля, производитель может создать сообщение для темы, но потребитель не может получить сообщение.Но он может работать в одном модуле, и код не меняется.
- Версия Springboot: 2.1.3 RELEASE
- Версия Spring Cloud: Greenwich.RELEASE
- Кафка версия: 2.0.1
Мой pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
Производитель
spring:
kafka:
bootstrap-servers: localhost:9092
cloud:
stream:
bindings:
output:
destination: org_change
content-type: application/json
OrganizationServiceApplication.java
@SpringBootApplication
@RefreshScope
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Source.class)
public class OrganizationServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrganizationServiceApplication.class, args);
}
}
SimpleSourceBean.java
@Component
@Slf4j
public class SimpleSourceBean {
private final Source organizationStreams;
@Autowired
public SimpleSourceBean(Source organizationStreams) {
this.organizationStreams = organizationStreams;
}
public void publishOrgChange(String action, String orgId) {
log.debug("Send Kafka message {} for Organization Id: {}", action, orgId);
// OrganizationChangeModel is a model(JOPO).
OrganizationChangeModel model = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action,
orgId,
UserContext.CORRELATION_ID
);
organizationStreams
.output()
.send(MessageBuilder
.withPayload(model)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
Потребитель
config:
spring:
kafka:
bootstrap-servers: localhost:9092
cloud:
stream:
bindings:
input:
destination: org_change
content-type: application/json
group: licensingGroup
LicensesServiceApplication.java
/**
* @author ming
*/
@SpringBootApplication
@RefreshScope
@EnableDiscoveryClient
@EnableFeignClients("com.ming.licenses.client")
@EnableCircuitBreaker
@EnableBinding(Sink.class)
@Slf4j
public class LicensesServiceApplication {
@Autowired
private OAuth2ClientContext oAuth2ClientContext;
public static void main(String[] args) {
SpringApplication.run(LicensesServiceApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChangeModel changeModel) {
log.debug("Received an event for organization id {}.", changeModel.getOrganizationId());
}
}