Spark Структурированное потоковое управление Kafka Offset - PullRequest
0 голосов
/ 16 мая 2019

Я ищу хранение смещений kafka внутри kafka для Spark Structured Streaming, как будто это работает для DStreams stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges), то же самое, что я ищу, но для структурированной потоковой передачи.Это поддерживает структурированную потоковую передачу?Если да, то как мне этого добиться?

Я знаю о контрольных точках hdfs, используя .option("checkpointLocation", checkpointLocation), но меня интересует именно встроенное управление смещением.

Я ожидаю, что Кафкахранить смещения только внутри без контрольной точки spark hdfs.

1 Ответ

0 голосов
/ 13 июня 2019

Я использую этот фрагмент кода, найденный где-то.

public class OffsetManager {

    private String storagePrefix;

    public OffsetManager(String storagePrefix) {
        this.storagePrefix = storagePrefix;
    }

    /**
     * Overwrite the offset for the topic in an external storage.
     *
     * @param topic     - Topic name.
     * @param partition - Partition of the topic.
     * @param offset    - offset to be stored.
     */
    void saveOffsetInExternalStore(String topic, int partition, long offset) {

        try {

            FileWriter writer = new FileWriter(storageName(topic, partition), false);

            BufferedWriter bufferedWriter = new BufferedWriter(writer);
            bufferedWriter.write(offset + "");
            bufferedWriter.flush();
            bufferedWriter.close();

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * @return he last offset + 1 for the provided topic and partition.
     */
    long readOffsetFromExternalStore(String topic, int partition) {

        try {

            Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));

            return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return 0;
    }

    private String storageName(String topic, int partition) {
        return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
    }

}

SaveOffset ... вызывается после успешной обработки записи, иначе смещение не сохраняется. и я использую темы Кафки в качестве источника, поэтому я указываю начальные смещения в качестве полученных смещений из ReadOffsets ...

...