StreamsException на весенних потоках кафки - PullRequest
0 голосов
/ 29 апреля 2020

Я использую весеннюю загрузку с kafka для манипулирования файлами файловой системы, используя два потока. Первый обнаруживает дубликаты файлов, а второй удаляет их.

@StreamListener("input1")
@SendTo("output1")
public KStream<Long, String> detectDuplicateImages(KStream<String, ImageFile> imageFileStream) {
    return imageFileStream
            .map((key,imageFile) -> {
                try {
                    logger.info("Reading file.. {}",imageFile.getFileName());
                    return new KeyValue<>(Files.size(Paths.get(imageFile.getFileName())),imageFile.getFileName());
                } catch (IOException e) {
                    return new KeyValue<>(0L, imageFile.getFileName());
                }
            })
            .groupByKey(Grouped.with(Serdes.Long(),Serdes.String()).withName("g1"))
            .<Long,String>reduce((value1, value2) -> {
                    Set<String> s1 =  new HashSet<String>(Arrays.asList(value1.split("\\|")));
                    s1.addAll(new HashSet<String>(Arrays.asList(value2.split("\\|"))));
                    return String.join("|", s1);
            }, Materialized.with(Serdes.Long(),Serdes.String()))
            .toStream()
            .filter((key, value) -> Strings.isNotEmpty(value) && value.contains("|"))
            .flatMapValues((key, value) -> Arrays.asList(value.split("\\|")))
            .<Long,String>groupByKey(Grouped.with(Serdes.Long(),Serdes.String()).withName("g2"))
            .<Long,String>reduce((file1, file2) -> {
                try {
                    String comp1 = file1;
                    if (file1.contains("|")) {
                        comp1 = file1.split("\\|")[0];
                    }
                    if (!com.google.common.io.Files.equal(new File(comp1), new File(file2))) {
                        return null;
                    } else {
                        return String.join("|",file1,file2);
                    }
                } catch (IOException e) {
                    return null;
                }
            }, Materialized.with(Serdes.Long(),Serdes.String()))
            .<Long,String>toStream()
            .filter((key, value) -> Strings.isNotEmpty(value) && value.contains("|"))
            .peek((key,value) -> {
                        List<String> originalFiles = Arrays.asList(value.split("\\|"));
                        logger.info("size [{}], files:[{}]", key, originalFiles);
                    })
            ;
}

Второй поток получает output1 и обрабатывает сообщения:

@StreamListener("input2")
@SendTo("output2")
public KStream<Integer, Integer> removeDuplicateImages(KStream<Long, String> imageFileStream) {
    return imageFileStream
            .groupByKey(Grouped.<Long,String>as("g1").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()))
            .<Long,String>reduce((value1, value2) -> String.join("|",value1,value2),
                    Materialized.with(Serdes.Long(),Serdes.String()))
            .<Long,String>toStream()
            .<Integer,Integer>map((key, identical_files) -> {
                Set<String> classes = new HashSet<>();
                Set<String> files = new HashSet<>(Arrays.asList(identical_files.split("\\|")));
                for (String fileName : files) {
                    classes.add(DatasetUtils.getClassName(fileName));
                }
                if (classes.size() == 1) {
                    logger.info("Mark for delete [{}] same class files",files.size()-1);
                    files.remove(files.iterator().next());
                } else if (classes.size() > 1) {
                    logger.info("Mark for delete all [{}] classes files",files.size());
                } else {
                    logger.info("No class marked for deletion");
                    return new KeyValue<Integer,Integer>(0,0);
                }
                for (String file : files) {
                    if (!simulate) {
                        logger.info("Deleting [{}]", file);
                        try {
                            Files.delete(Paths.get(file));
                        } catch (IOException e) {
                            logger.error("Error deleting file [{}] - skip", file);
                        }
                    }
                }
                return new KeyValue<Integer,Integer>(0, files.size());
            })
            .<Integer,Integer>groupByKey(Grouped.<Integer,Integer>as("g3").withKeySerde(Serdes.Integer()).withKeySerde(Serdes.Integer()))
            .<Integer,Integer>reduce(Integer::sum, Materialized.with(Serdes.Integer(),Serdes.Integer()))
            .<Integer,Integer>toStream()
            .peek((key, value) -> logger.info("[{}] files deleted", value))
    ;
}

Я использую следующую конфигурацию

spring.cloud.stream.kstream.bindings.output1.producer.keySerde=org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kstream.bindings.output1.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.bindings.input2.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kstream.bindings.input2.consumer.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.bindings.output2.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.kstream.bindings.output2.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde

Запуск этого вызывает следующее исключение:

    Exception in thread "DuplicationProcessor-detectDuplicateImages-applicationId-f55a46cc-a729-4979-b518-11ce098d40bf-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-REDUCE-STATE-STORE-0000000010
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:280)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:417)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1079)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:933)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.IntegerSerializer / value: org.apache.kafka.common.serialization.IntegerSerializer) is not compatible to the actual key or value type (key type: java.lang.Long / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:131)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:237)
    at 

Пожалуйста, сообщите, что здесь не так. Я хотел бы получить одно сообщение на output2 после обработки.

...