Я пытаюсь интегрировать мое весеннее загрузочное приложение с удаленной кафкой, но я столкнулся с ошибкой при запуске моего приложения. Ошибка произошла, когда потребитель прослушал удаленную кафку, см. Ниже:
2020-04-02 08:28:58.795 INFO 17760 --- [ restartedMain] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-04-02 08:28:58.855 INFO 17760 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
2020-04-02 08:28:58.858 INFO 17760 --- [ restartedMain] kafka.topic.queue.Application : Started Application in 16.273 seconds (JVM running for 18.555)
2020-04-02 08:28:59.874 INFO 17760 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-1, groupId=json] Cluster ID: l6gelyg5RtKbGqghgTYnAA
2020-04-02 08:28:59.876 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=json] Discovered group coordinator xx.xxx.5.xxx:9092 (id: 2147483646 rack: null)
2020-04-02 08:28:59.884 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=json] Revoking previously assigned partitions []
2020-04-02 08:28:59.886 INFO 17760 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : json: partitions revoked: []
2020-04-02 08:28:59.887 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=json] (Re-)joining group
2020-04-02 08:29:00.805 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=json] (Re-)joining group
2020-04-02 08:29:01.617 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=json] Successfully joined group with generation 37
2020-04-02 08:29:01.635 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=json] Setting newly assigned partitions: devtopic-2, devtopic-1, devtopic-0
2020-04-02 08:29:02.026 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=json] Setting offset for partition devtopic-2 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=xx.xxx.5.xxx:9092 (id: 1 rack: null), epoch=0}}
2020-04-02 08:29:02.028 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=json] Setting offset for partition devtopic-1 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=xx.xxx.5.xxx:9092 (id: 1 rack: null), epoch=0}}
2020-04-02 08:29:02.028 INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=json] Setting offset for partition devtopic-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=xx.xxx.5.xxx:9092 (id: 1 rack: null), epoch=0}}
2020-04-02 08:29:02.640 INFO 17760 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : json: partitions assigned: [devtopic-2, devtopic-1, devtopic-0]
**2020-04-02 08:29:03.891 INFO 17760 --- [ntainer#0-0-C-1] k.topic.queue.consumer.MessageConsumer : Logger 1 [JSON] received key null: Type [N/A] | Payload: Name::toString() -> {id=1,fname=test,mname=test,lname=test} | Record: ConsumerRecord(topic = devtopic, partition = 1, leaderEpoch = 0, offset = 7, CreateTime = 1585747622488, serialized key size = -1, serialized value size = 53, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Name::toString() -> {id=1,fname=test,mname=test,lname=test})**
2020-04-02 08:29:04.300 ERROR 17760 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition devtopic-1 at offset 8. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[108]] from topic [devtopic]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'l': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"l"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3556) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2651) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:856) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:753) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1704) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282) ~[jackson-databind-2.10.2.jar:2.10.2]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_241]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_241]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_241]
2020-04-02 08:29:04.305 ERROR 17760 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition devtopic-1 at offset 8. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[108]] from topic [devtopic]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'l': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"l"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3556) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2651) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:856) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:753) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1704) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282) ~[jackson-databind-2.10.2.jar:2.10.2]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_241]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_241]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_241]
2020-04-02 08:29:04.308 ERROR 17760 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition devtopic-1 at offset 8. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[108]] from topic [devtopic]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'l': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"l"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3556) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2651) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:856) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:753) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1704) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282) ~[jackson-databind-2.10.2.jar:2.10.2]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_241]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_241]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_241]
ошибка продолжается до тех пор, пока я не остановлю запуск приложения. Кроме того, вы можете заметить, что до возникновения ошибки мне удалось получить сообщение от topi c ..
. Я предоставлю свои конфигурации ниже:
application.yml
server:
port: 8081
spring:
kafka:
bootstrap-servers: xx.xxx.5.xxx:9092
consumer:
#bootstrap-servers: xx.xxx.5.xxx:9092
group-id: json
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: kafka.topic.queue.entity
use:
type:
headers: false
value:
default:
type: kafka.topic.queue.entity.Name
producer:
#bootstrap-servers: xx.xxx.5.xxx:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring:
json:
add:
type:
headers: false
MessageConsumer. java
@Service
public class MessageConsumer{
@KafkaListener(topics = TopicQueueConstant.TOPIC_NAME, groupId = "json",
containerFactory = "kafkaListenerContainerFactory")
public void listenAsObject(ConsumerRecord<String, Name> cr,
@Payload Name payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr);
}
private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}
}
MessageProducer. java
@Service
public class MessageProducer{
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendMessageToTopic(String message) {
logger.info("sendMessageToTopic service invoked");
logger.info("sending to topic queue");
Name name = new Name();
name.setFname("test");
name.setLname("test");
name.setMname("test");
name.setId(1L);
// Message<Name> msg = MessageBuilder
// .withPayload(name)
// .setHeader(KafkaHeaders.TOPIC, TopicQueueConstant.TOPIC_NAME)
// .build();
this.kafkaTemplate.send(TopicQueueConstant.TOPIC_NAME, "1", name);
logger.info("message has been sent successfully to topic queue");
}
}
Имя. java (POJO)
public class Name implements Serializable {
/**
*
*/
private static final long serialVersionUID = 5172012720819652286L;
private long id;
private String fname;
private String lname;
private String mname;
public Name() {}
public Name(@JsonProperty("id") final long id,
@JsonProperty("fname") final String fname,
@JsonProperty("lname") final String lname,
@JsonProperty("mname") final String mname) {
super();
this.id = id;
this.fname = fname;
this.lname = lname;
this.mname = mname;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getFname() {
return fname;
}
public void setFname(String fname) {
this.fname = fname;
}
public String getLname() {
return lname;
}
public void setLname(String lname) {
this.lname = lname;
}
public String getMname() {
return mname;
}
public void setMname(String mname) {
this.mname = mname;
}
@Override
public String toString() {
return "Name::toString() -> {"
+ "id=" + this.id
+ ",fname=" + this.fname
+ ",mname=" + this.mname
+ ",lname=" + this.lname + "}";
}
}
и это метод моего контроллера
@Autowired
private MessageProducer producer;
@PostMapping(value = "/publish")
@ResponseBody
public ResponseEntity<Object> publishMessage(@RequestParam String message){
logger.info("publishMessage endpoint invoked");
logger.info("parameter received = {}", message);
this.producer.sendMessageToTopic(message);
return ResponseEntity.ok().build();
}
Я застрял с этой проблемой почти 2 дня.