Я пытаюсь реализовать функцию обработки ошибок для SCS с Kafka Binder, и в настоящее время у меня возникают проблемы с получением сообщения об ошибках в теме ошибок.
1) Есть ли что-то конкретное, что нужно указать для ошибок в.yml-файл, такой как group или content-type
2) Как выполнить повторную попытку, когда сообщение MSG попадает в тему Kafka?
Спасибо.
Подробности приведены ниже:-
1) Производитель, который генерирует JSON каждые несколько секунд: -
@EnableBinding(Source.class)
public class LoggingProducer {
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "7000", maxMessagesPerPoll = "1"))
public LoggingObject pumpSource() {
LoggingObject loggingObject = new LoggingObject();
String loggingNumber = UUID.randomUUID().toString().toUpperCase().replaceAll("-", "");
System.out.println(loggingNumber);
loggingObject.setLoggingId(loggingNumber);
Random rand = new Random();
int randint = rand.nextInt(100000);
if (randint % 3 == 0) {
loggingObject.setLoggingMessageStatus("SENT");
}
else if (randint % 4 == 0) {
loggingObject.setLoggingMessageStatus("REVIEW");
}
else {
loggingObject.setLoggingMessageStatus("ERROR");
}
System.out.println(loggingObject.toString());
return loggingObject;
}
}
2) application.yml
spring:
cloud:
stream:
bindings:
output:
destination: Processortopic
group: myGroup
producer:
header-mode: embeddedHeaders
content-type: application/json
3) Потребительское приложение: -
@EnableBinding(Sink.class)
@Configuration
public class LoggingObjectProcessor {
@StreamListener(Sink.INPUT) // destination name 'input.myGroup'
public void handle(LoggingObject loggingObject) {
System.out.println("In the Consumer---->>>>><<<<<<");
throw new RuntimeException("BOOM!");
}
/*@ServiceActivator(inputChannel = "Sourcetopic.myGroup.errors")
public void error(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}*/
@StreamListener("errorChannel")
public void errorGlobal(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}
}
4) Consumer Application.yml
spring:
cloud:
stream:
bindings:
input:
destination: Processortopic
group: myGroup
consumer:
header-mode: embeddedHeaders
content-type: application/json
error:
destination: myErrors
content-type: application/json
5) LoggingObject POJO
public class LoggingObject {
private String loggingId;
private String loggingMessageStatus;
public String getLoggingId() {
return loggingId;
}
public void setLoggingId(String loggingId) {
this.loggingId = loggingId;
}
public LoggingObject() {
}
public String getLoggingMessageStatus() {
return loggingMessageStatus;
}
public void setLoggingMessageStatus(String loggingMessageStatus) {
this.loggingMessageStatus = loggingMessageStatus;
}
}
6) Вот POM
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
Обновленный потребительПриложение и журналы
@ServiceActivator(inputChannel = "Processortopic.myGroup.errors")
public void error(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}
Журналы: -
2018-05-23 10:21:36.178 INFO 76939 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
Handling ERROR: ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.LoggingObjectProcessor#handle[1 args]; nested exception is java.lang.RuntimeException: BOOM!, failedMessage=GenericMessage [payload=byte[191], headers={kafka_offset=4721, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@29a9cb9d, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, id=feda8595-5ef6-35dd-b43f-4940a90017ba, kafka_receivedPartitionId=0, contentType=application/json;charset=UTF-8, kafka_receivedTopic=Processortopic, kafka_receivedTimestamp=1526991171269, timestamp=1527088896173}], headers={kafka_data=ConsumerRecord(topic = Processortopic, partition = 0, offset = 4721, CreateTime = 1526991171269, serialized key size = -1, serialized value size = 277, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@538f1f04), id=37e72560-5e63-db90-27f3-2ff2e04e1778, timestamp=1527088896174}] for original GenericMessage [payload=byte[277], headers={kafka_offset=4721, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@29a9cb9d, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=Processortopic, kafka_receivedTimestamp=1526991171269}]
In the Consumer---->>>>><<<<<<
In the Consumer---->>>>><<<<<<
In the Consumer---->>>>><<<<<<