Несовместимое ограничение равенства при использовании Akka Kafka Streams - PullRequest
0 голосов
/ 10 июня 2018

Я пытаюсь использовать Akka Kafka Streams, следуя документации Akka Kafka Streams .Вот код, который у меня есть:

ConsumerSettings<byte[], ETLProcessMessage> consumerSettings = ConsumerSettings
                .create(actorSystem, new ByteArrayDeserializer(), new KafkaJacksonSerializer<>(ETLProcessMessage.class))
                .withBootstrapServers(kafkaServers)
                .withGroupId(consumerGroupId)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
                .mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> {
                    handlePartitionedRequest(msg.record().value());
                    return Done.getInstance();
                }))
                .runWith(Sink.ignore(), materializer);

Но приведенный выше код показывает ошибку компилятора при runwith (): enter image description here

Вот код для KafkaJacksonSerializer:

import com.adaequare.mapro.common.exception.AppException;
import com.adaequare.mapro.config.jackson.PostConstructDeserializer;
import com.adaequare.mapro.model.transformer.JSONTransformer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.io.CharStreams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;

public class KafkaJacksonSerializer<T> implements Serializer<T>, Deserializer<T>{    
    private ObjectReader objectReader;
    private ObjectWriter objectWriter;
    private ObjectMapper objectMapper;

    public KafkaJacksonSerializer(){   
    }

    public KafkaJacksonSerializer(Class<T> persistentClass) {
        objectMapper = new ObjectMapper();

        SimpleModule module = new SimpleModule();
        module.setDeserializerModifier(new BeanDeserializerModifier() {
            @Override
            public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config,
                                                          BeanDescription beanDesc, final JsonDeserializer<?> deserializer) {
                return new PostConstructDeserializer(deserializer);
            }
        });
        objectMapper.registerModule(module);

        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
                .withFieldVisibility(JsonAutoDetect.Visibility.ANY)
                .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withSetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withCreatorVisibility(JsonAutoDetect.Visibility.NONE));

        objectReader = objectMapper.readerFor(persistentClass);
        objectWriter = objectMapper.writer();
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        InputStream stream = new ByteArrayInputStream(data);
        if(stream == null){
            return null;
        }

        try {
            String json = CharStreams.toString(new InputStreamReader(stream));
            return objectReader.readValue(json);
        } catch (IOException e) {
            throw AppException.forException("Error while unmarshalling AssetData JSON: "+e.getMessage(), e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if(data == null){
            return null;
        }

        try {
            return objectWriter.writeValueAsBytes(data);
        } catch (IOException e) {
            throw AppException.forException("Error while marshalling JSON: "+e.getMessage(), e);
        }
    }

    @Override
    public void close() {
    }
}

Я не уверен, в чем именно проблема.Но приведенный ниже код не отображает никаких ошибок:

ConsumerSettings newconsumerSettings = ConsumerSettings
                .create(actorSystem, new ByteArrayDeserializer(), new StringDeserializer())
                .withBootstrapServers(kafkaServers)
                .withGroupId(consumerGroupId)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer.committableSource(newconsumerSettings, Subscriptions.topics("topic2"))
                .mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> Done.getInstance()))
                .runWith(Sink.ignore(), materializer);

Может кто-нибудь помочь мне определить, что здесь происходит не так?

1 Ответ

0 голосов
/ 11 июня 2018

Существует несоответствие версии Акки между добавленными зависимостями.После того, как я исправил их так же, я больше не вижу ошибки компиляции.

Вот зависимости, которые я использовал:

compile 'com.typesafe.akka:akka-actor_2.12:2.5.4'
compile 'com.typesafe.akka:akka-cluster_2.12:2.5.4'
compile 'com.typesafe.akka:akka-cluster-tools_2.12:2.5.4'
compile 'com.typesafe.akka:akka-slf4j_2.12:2.5.4'

Это то, что я недавно добавил для реактивнойkafka:

compile 'com.typesafe.akka:akka-stream-kafka_2.12:0.21'

После обновления зависимостей akka (связанных с субъектом / кластером) до 2.5.9 ошибка компиляции исчезла.

...