Исключение RocksDB в потоках Кафки - PullRequest
1 голос
/ 25 марта 2019

В простой программе Kafka Stream, когда я использую приведенный ниже код, она работает без каких-либо ошибок:

      KTable<String, Long> result= source.mapValues(textLine
      ->textLine.toLowerCase()) .flatMapValues(lowercasedTextLine ->
      Arrays.asList(lowercasedTextLine.split(" "))) .selectKey((ignoredKey,word) ->
      word) .groupByKey() .count("Counts");

      result.to(Serdes.String(), Serdes.Long(), "wc-output");

Однако, когда я использую приведенный ниже код, я получаю сообщение об ошибке:

    KStream<String, String> source = builder.stream("wc-input");
    source.groupBy((key, word) -> word).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(5000))).count()
            .toStream().map((key, value) -> new KeyValue<>(key.key(), value))
            .to("wc-output", Produced.with(Serdes.String(), Serdes.Long()));

Исключение в потоке "Потоки-WordCount-b160d715-f0e0-42ee-831e-0e4eed7e9424-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: исключение, пойманное в процесс. taskId = 1_0, процессор = KSTREAM-SOURCE-0000000006, тема = потоки-WordCount-KSTREAM-АГРЕГАТ-STATE-STORE-0000000002-передел, раздел = 0, смещение = 0 при org.apache.kafka.streams.processor.internals.StreamTask.process (StreamTask.java:232) в org.apache.kafka.streams.processor.internals.AssignedTasks.process (AssignedTasks.java:403) в org.apache.kafka.streams.processor.internals.TaskManager.process (TaskManager.java:317) в org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit (StreamThread.java:942) в org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:822) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:774) в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:744) Вызывается: org.apache.kafka.streams.errors.ProcessorStateException: Ошибка при открытии магазина KSTREAM-AGGREGATE-STATE-STORE-STORE-0000000002: 1553472000000 по месту нахождения \ TMP \ Кафка-потоки \ потоки-WordCount \ 1_0 \ KSTREAM-АГРЕГАТ-STATE-STORE-0000000002 \ KSTREAM-АГРЕГАТ-STATE-STORE-0000000002: 1553472000000 в org.apache.kafka.streams.state.internals.RocksDBStore.openDB (RocksDBStore.java:204) в org.apache.kafka.streams.state.internals.RocksDBStore.openDB (RocksDBStore.java:174) в org.apache.kafka.streams.state.internals.Segment.openDB (Segment.java:40) в org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment (Segments.java:89) в org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put (RocksDBSegmentedBytesStore.java:81) в org.apache.kafka.streams.state.internals.RocksDBWindowStore $ RocksDBWindowBytesStore.put (RocksDBWindowStore.java:43) в org.apache.kafka.streams.state.internals.RocksDBWindowStore $ RocksDBWindowBytesStore.put (RocksDBWindowStore.java:34) в org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put (ChangeLoggingWindowBytesStore.java:67) в org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put (ChangeLoggingWindowBytesStore.java:33) в org.apache.kafka.streams.state.internals.CachingWindowStore $ 1.Apply (CachingWindowStore.java:100) в org.apache.kafka.streams.state.internals.NamedCache.flush (NamedCache.java:141) в org.apache.kafka.streams.state.internals.NamedCache.evict (NamedCache.java:232) в org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict (ThreadCache.java:245) в org.apache.kafka.streams.state.internals.ThreadCache.put (ThreadCache.java:153) в org.apache.kafka.streams.state.internals.CachingWindowStore.put (CachingWindowStore.java:157) в org.apache.kafka.streams.state.internals.CachingWindowStore.put (CachingWindowStore.java:36) в org.apache.kafka.streams.state.internals.MeteredWindowStore.put (MeteredWindowStore.java:96) в org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate $ KStreamWindowAggregateProcessor.process (KStreamWindowAggregate.java:122) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:208) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85) в org.apache.kafka.streams.processor.internals.SourceNode.process (SourceNode.java:80) вorg.apache.kafka.streams.processor.internals.StreamTask.process (StreamTask.java:216) ... еще 6 причин: org.rocksdb.RocksDBException: не удалось создать реж: Н: \ TMP \ Кафка-потоки \ потоки-WordCount \ 1_0 \ KSTREAM-агрегатно-ШТАТ-ЗАП-0000000002 \ KSTREAM-агрегатно-ШТАТ-ЗАП-0000000002: 1553472000000: Неверный аргумент в org.rocksdb.RocksDB.open (собственный метод) в org.rocksdb.RocksDB.open (RocksDB.java:231) в org.apache.kafka.streams.state.internals.RocksDBStore.openDB (RocksDBStore.java:197)

1 Ответ

2 голосов
/ 26 марта 2019

Когда вы используете оконную агрегацию, сохраняете именованные имена по-разному, и в Kafka 1.0.0 возникает ошибка, влияющая на ОС Windows: имя для оконных хранилищ содержит :, что недопустимо в ОС Windows.Ошибка исправлена ​​в версии 1.0.1 и 1.1.0

Cf.https://issues.apache.org/jira/browse/KAFKA-6167

...