Я пытаюсь создать самый простой мир привет с помощью Spring Cloud + Kafka Streams + Spring Boot 2.
Я понимаю, что скучаю по основным понятиям.В принципе, я понимаю, что:
1 - мне нужно определить исходящий поток для записи сообщений в тему Кафки и входящий поток для чтения сообщений из темы Кафки
public interface LoansStreams {
String INPUT = "loans-in";
String OUTPUT = "loans-out";
@Input(INPUT)
SubscribableChannel inboundLoans();
@Output(OUTPUT)
MessageChannel outboundLoans();
}
2- настроить Spring Cloud Stream для привязки к моим потокам
@EnableBinding(LoansStreams.class)
public class StreamsConfig {
}
3 - настроить свойства Kafka
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
loans-in:
destination: loans
contentType: application/json
loans-out:
destination: loans
contentType: application/json
4 - создать модель для обмена сообщениями
@Getter @Setter @ToString @Builder
public class Loans {
private long timestamp;
private String result;
}
5- написать Кафке
@Service
@Slf4j
public class LoansService {
private final LoansStreams loansStreams;
public LoansService(LoansStreams loansStreams) {
this.loansStreams = loansStreams;
}
public void sendLoan(final Loans loans) {
log.info("Sending loans {}", loans);
MessageChannel messageChannel = loansStreams.outboundLoans();
messageChannel.send(MessageBuilder
.withPayload(loans)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}
6 - прослушать тему Кафки
@Component
@Slf4j
public class LoansListener {
@StreamListener(LoansStreams.INPUT)
public void handleLoans(@Payload Loans loans) {
log.info("Received results: {}", loans);
}
}
Я провел целый день, читая несколько блогов, и я предполагаю, что приведенный выше код по крайней мере выполним.Я не уверен, что действительно кодирую лучший подход, насколько это возможно.Кстати, я получаю сообщение об ошибке, упомянутое в теме:
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 18:33:05.619 ERROR 14784 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.
Поиск в поисках решения, я нашел, что кто-то говорит, чтобы код StreamListe возвращал модель, поэтому я заменил ее на:
@StreamListener(LoansStreams.INPUT)
@SendTo("loans-out")
public KStream<?, Loans> process(KStream<?, Loans> l) {
log.info("Received: {}", l);
return l;
}
и тогда я получаю ошибку, еще менее понятную, по крайней мере, для меня (предыдущая ошибка явно упоминала некоторую проблему связующего):
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 19:01:06.016 ERROR 13276 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalArgumentException: Method must be declarative
at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:510) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:168) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:226) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
В случае, если это как-то помогает, я хочу развить эту идею, чтобы применить SAGAS, но этоне в центре внимания этого вопроса.Во-первых, мне нужно получить базовую версию.
* отредактировано
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.mybank</groupId>
<artifactId>kafka-cloud-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-cloud-stream</name>
<description>Spring Cloud Stream With Kafka</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<!-- version>5.1.5.RELEASE</version-->
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>