получение приведенной ниже трассировки стека ошибок при работе с потоками kafka
ОБНОВЛЕНИЕ: согласно @ matthias-j-sax, я реализовал свой собственный Serdes
с конструктором по умолчанию для WrapperSerde
, но все еще получаю следующие исключения
org.apache.kafka.streams.errors.StreamsException: stream-thread [streams-request-count-4c239508-6abe-4901-bd56-d53987494770-StreamThread-1] Failed to rebalance.
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:836)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class myapps.serializer.Serdes$WrapperSerde
at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:972)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: java.lang.NullPointerException
at myapps.serializer.Serdes$WrapperSerde.configure (Serdes.java:30)
at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:968)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Вот мой пример использования:
Я буду получать ответы json в качестве входных данных для потока, я хочу подсчитывать запросы, чьи коды состояния не равны 200. Сначала я изучил документацию по потокам kafka в официальной документации, а также по слиянию, затем реализовал WordCountDemo
который работает очень хорошо, затем я попытался написать этот код, но, получив это исключение, я очень плохо знаком с потоками kafka, я прошел трассировку стека, но не смог понять контекст, поэтому пришел сюда за помощью !!!
Вот мой код
LogCount.java
package myapps;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import myapps.serializer.JsonDeserializer;
import myapps.serializer.JsonSerializer;
import myapps.Request;
public class LogCount {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-request-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
JsonSerializer<Request> requestJsonSerializer = new JsonSerializer<>();
JsonDeserializer<Request> requestJsonDeserializer = new JsonDeserializer<>(Request.class);
Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Request> source = builder.stream("streams-requests-input");
source.filter((k, v) -> v.getHttpStatusCode() != 200)
.groupByKey()
.count()
.toStream()
.to("streams-requests-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
System.out.println(topology.describe());
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.cleanUp();
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
JsonDeserializer.java
package myapps.serializer;
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
public JsonDeserializer() {
}
@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}
return gson.fromJson(new String(bytes),deserializedClass);
}
@Override
public void close() {
}
}
JsonSerializer.java
package myapps.serializer;
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.Charset;
import java.util.Map;
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T t) {
return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
Как я уже говорил, я буду получать JSON в качестве входных данных, структура будет такой,
{
"RequestID": "1f6b2409",
"Протокол": "HTTP",
"Host": "abc.com",
"Метод": "GET",
"HTTPStatusCode": "200",
"User-Agent": "свернуться% 2f7.54.0",
}
Соответствующий файл Request.java
выглядит следующим образом
package myapps;
public final class Request {
private String requestID;
private String protocol;
private String host;
private String method;
private int httpStatusCode;
private String userAgent;
public String getRequestID() {
return requestID;
}
public void setRequestID(String requestID) {
this.requestID = requestID;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public int getHttpStatusCode() {
return httpStatusCode;
}
public void setHttpStatusCode(int httpStatusCode) {
this.httpStatusCode = httpStatusCode;
}
public String getUserAgent() {
return userAgent;
}
public void setUserAgent(String userAgent) {
this.userAgent = userAgent;
}
}
РЕДАКТИРОВАТЬ: когда я выхожу из kafka-console-consumer.sh
, он говорит Processed a total of 0 messages
.