Исключение потоков kafka Не удалось найти общедоступный конструктор без аргументов для org.apache.kafka.common.serialization.Serdes $ WrapperSerde - PullRequest
0 голосов
/ 27 июня 2018

получение приведенной ниже трассировки стека ошибок при работе с потоками kafka

ОБНОВЛЕНИЕ: согласно @ matthias-j-sax, я реализовал свой собственный Serdes с конструктором по умолчанию для WrapperSerde, но все еще получаю следующие исключения

org.apache.kafka.streams.errors.StreamsException: stream-thread [streams-request-count-4c239508-6abe-4901-bd56-d53987494770-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:836)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class myapps.serializer.Serdes$WrapperSerde
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:972)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
    at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
    at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: java.lang.NullPointerException
    at myapps.serializer.Serdes$WrapperSerde.configure (Serdes.java:30)
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:968)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
    at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
    at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)

Вот мой пример использования:

Я буду получать ответы json в качестве входных данных для потока, я хочу подсчитывать запросы, чьи коды состояния не равны 200. Сначала я изучил документацию по потокам kafka в официальной документации, а также по слиянию, затем реализовал WordCountDemo который работает очень хорошо, затем я попытался написать этот код, но, получив это исключение, я очень плохо знаком с потоками kafka, я прошел трассировку стека, но не смог понять контекст, поэтому пришел сюда за помощью !!!

Вот мой код

LogCount.java

package myapps;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import myapps.serializer.JsonDeserializer;
import myapps.serializer.JsonSerializer;
import myapps.Request;


public class LogCount {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-request-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        JsonSerializer<Request> requestJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Request> requestJsonDeserializer = new JsonDeserializer<>(Request.class);
        Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());
        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, Request> source = builder.stream("streams-requests-input");
        source.filter((k, v) -> v.getHttpStatusCode() != 200)
                .groupByKey()
                .count()
                .toStream()
                .to("streams-requests-output", Produced.with(Serdes.String(), Serdes.Long()));
        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        System.out.println(topology.describe());
        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.cleanUp();
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

JsonDeserializer.java

package myapps.serializer;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    public JsonDeserializer() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
         if(bytes == null){
             return null;
         }

         return gson.fromJson(new String(bytes),deserializedClass);

    }

    @Override
    public void close() {

    }
}

JsonSerializer.java

package myapps.serializer;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.Charset;
import java.util.Map;

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, T t) {
        return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
}

Как я уже говорил, я буду получать JSON в качестве входных данных, структура будет такой,

{
"RequestID": "1f6b2409", "Протокол": "HTTP", "Host": "abc.com", "Метод": "GET", "HTTPStatusCode": "200", "User-Agent": "свернуться% 2f7.54.0", }

Соответствующий файл Request.java выглядит следующим образом

package myapps;

public final class Request {
    private String requestID;
    private String protocol;
    private String host;
    private String method;
    private int httpStatusCode;
    private String userAgent;

    public String getRequestID() {
        return requestID;
    }
    public void setRequestID(String requestID) {
        this.requestID = requestID;
    }
    public String getProtocol() {
        return protocol;
    }
    public void setProtocol(String protocol) {
        this.protocol = protocol;
    }
    public String getHost() {
        return host;
    }
    public void setHost(String host) {
        this.host = host;
    }
    public String getMethod() {
        return method;
    }
    public void setMethod(String method) {
        this.method = method;
    }
    public int getHttpStatusCode() {
        return httpStatusCode;
    }
    public void setHttpStatusCode(int httpStatusCode) {
        this.httpStatusCode = httpStatusCode;
    }
    public String getUserAgent() {
        return userAgent;
    }
    public void setUserAgent(String userAgent) {
        this.userAgent = userAgent;
    }
}

РЕДАКТИРОВАТЬ: когда я выхожу из kafka-console-consumer.sh, он говорит Processed a total of 0 messages.

1 Ответ

0 голосов
/ 27 июня 2018

Как показывают ошибки, в классе отсутствует конструктор по умолчанию без аргументов для Serdes$WrapperSerde:

Could not find a public no-argument constructor 

Проблема заключается в следующем:

Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());

Serdes.serdeFrom return WrapperSerde, у которого нет пустого конструктора по умолчанию. Таким образом, вы не можете передать его в StreamsConfig. Вы можете использовать Serdes генерировать таким образом, только если вы передаете объекты в соответствующие вызовы API (т. Е. Перезаписываете по умолчанию Serde для определенных операторов).

Чтобы это работало (т. Е. Чтобы можно было установить Serde в конфигурации), вам необходимо реализовать соответствующий класс, который реализует интерфейс Serde.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...