Я использую весеннюю загрузку с kafka для манипулирования файлами файловой системы, используя два потока. Первый обнаруживает дубликаты файлов, а второй удаляет их.
@StreamListener("input1")
@SendTo("output1")
public KStream<Long, String> detectDuplicateImages(KStream<String, ImageFile> imageFileStream) {
return imageFileStream
.map((key,imageFile) -> {
try {
logger.info("Reading file.. {}",imageFile.getFileName());
return new KeyValue<>(Files.size(Paths.get(imageFile.getFileName())),imageFile.getFileName());
} catch (IOException e) {
return new KeyValue<>(0L, imageFile.getFileName());
}
})
.groupByKey(Grouped.with(Serdes.Long(),Serdes.String()).withName("g1"))
.<Long,String>reduce((value1, value2) -> {
Set<String> s1 = new HashSet<String>(Arrays.asList(value1.split("\\|")));
s1.addAll(new HashSet<String>(Arrays.asList(value2.split("\\|"))));
return String.join("|", s1);
}, Materialized.with(Serdes.Long(),Serdes.String()))
.toStream()
.filter((key, value) -> Strings.isNotEmpty(value) && value.contains("|"))
.flatMapValues((key, value) -> Arrays.asList(value.split("\\|")))
.<Long,String>groupByKey(Grouped.with(Serdes.Long(),Serdes.String()).withName("g2"))
.<Long,String>reduce((file1, file2) -> {
try {
String comp1 = file1;
if (file1.contains("|")) {
comp1 = file1.split("\\|")[0];
}
if (!com.google.common.io.Files.equal(new File(comp1), new File(file2))) {
return null;
} else {
return String.join("|",file1,file2);
}
} catch (IOException e) {
return null;
}
}, Materialized.with(Serdes.Long(),Serdes.String()))
.<Long,String>toStream()
.filter((key, value) -> Strings.isNotEmpty(value) && value.contains("|"))
.peek((key,value) -> {
List<String> originalFiles = Arrays.asList(value.split("\\|"));
logger.info("size [{}], files:[{}]", key, originalFiles);
})
;
}
Второй поток получает output1 и обрабатывает сообщения:
@StreamListener("input2")
@SendTo("output2")
public KStream<Integer, Integer> removeDuplicateImages(KStream<Long, String> imageFileStream) {
return imageFileStream
.groupByKey(Grouped.<Long,String>as("g1").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()))
.<Long,String>reduce((value1, value2) -> String.join("|",value1,value2),
Materialized.with(Serdes.Long(),Serdes.String()))
.<Long,String>toStream()
.<Integer,Integer>map((key, identical_files) -> {
Set<String> classes = new HashSet<>();
Set<String> files = new HashSet<>(Arrays.asList(identical_files.split("\\|")));
for (String fileName : files) {
classes.add(DatasetUtils.getClassName(fileName));
}
if (classes.size() == 1) {
logger.info("Mark for delete [{}] same class files",files.size()-1);
files.remove(files.iterator().next());
} else if (classes.size() > 1) {
logger.info("Mark for delete all [{}] classes files",files.size());
} else {
logger.info("No class marked for deletion");
return new KeyValue<Integer,Integer>(0,0);
}
for (String file : files) {
if (!simulate) {
logger.info("Deleting [{}]", file);
try {
Files.delete(Paths.get(file));
} catch (IOException e) {
logger.error("Error deleting file [{}] - skip", file);
}
}
}
return new KeyValue<Integer,Integer>(0, files.size());
})
.<Integer,Integer>groupByKey(Grouped.<Integer,Integer>as("g3").withKeySerde(Serdes.Integer()).withKeySerde(Serdes.Integer()))
.<Integer,Integer>reduce(Integer::sum, Materialized.with(Serdes.Integer(),Serdes.Integer()))
.<Integer,Integer>toStream()
.peek((key, value) -> logger.info("[{}] files deleted", value))
;
}
Я использую следующую конфигурацию
spring.cloud.stream.kstream.bindings.output1.producer.keySerde=org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kstream.bindings.output1.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.bindings.input2.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kstream.bindings.input2.consumer.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.bindings.output2.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.kstream.bindings.output2.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde
Запуск этого вызывает следующее исключение:
Exception in thread "DuplicationProcessor-detectDuplicateImages-applicationId-f55a46cc-a729-4979-b518-11ce098d40bf-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-REDUCE-STATE-STORE-0000000010
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:280)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:417)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1079)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:933)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.IntegerSerializer / value: org.apache.kafka.common.serialization.IntegerSerializer) is not compatible to the actual key or value type (key type: java.lang.Long / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:131)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:237)
at
Пожалуйста, сообщите, что здесь не так. Я хотел бы получить одно сообщение на output2 после обработки.