Kafka Streams - магазин не создан - PullRequest
0 голосов
/ 27 ноября 2018

Я собираюсь рассчитать среднее с потоками кафки.Поэтому я делаю операцию с состоянием, агрегат, который нужно создать в хранилище состояний, но эта доза не происходит.

Здесь функция для среднего:

private void average () {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<GenericRecord, GenericRecord> source = 
                builder.stream(this.topicSrc);

        KStream <String, Double> average = source
                .mapValues(value -> createJson(value.toString()))
                .map((key, value) -> KeyValue.pair(this.variable, value.getNumberValue(this.pathVariable, this.variable)))
                .groupByKey( Serialized.with(
                        Serdes.String(),
                        Serdes.String()))
                .aggregate (
                        () -> new Tuple(0, 0),
                        (aggKey, newValue, aggValue) ->  new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
                        Materialized.with(Serdes.String(), new MySerde()))
                .mapValues(v -> v.getAverage())
                .toStream();

        average.to(this.topicDest, Produced.with(Serdes.String(), Serdes.Double()));
        KafkaStreams stream = new KafkaStreams(builder.build(), props);
        stream.start();

    }

вот исключение:

 Exception in thread "Thread-0" org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.streams.errors.ProcessorStateException: base state directory [/tmp/kafka-streams] doesn't exist and couldn't be created
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:658)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:628)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:538)
    at it.imolinfo.sacmi.processor.Streamer.average(Streamer.java:167)
    at it.imolinfo.sacmi.processor.Streamer.run(Streamer.java:180)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: base state directory [/tmp/kafka-streams] doesn't exist and couldn't be created
    at org.apache.kafka.streams.processor.internals.StateDirectory.<init>(StateDirectory.java:80)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:656)
    ... 5 more

Проблема в том, что доза не существует в базовом каталоге, но я хочу, чтобы поток kafka создал каталог в случае необходимости.

--- EDIT ----- Я заметил, чтоесли у меня есть 1 процессор, который составляет среднее значение для переменной, проблем нет, но если у меня есть 2 процессора, то да.

Файл конфигурации для 1 процессора:

 type->streamer
 number->1
 subtype->average
 variabli->payload:T_DUR_CICLO
 topicSrc->m0-tempi
 topicDest->average
 application.id->stream0
 bootstrap.servers->localhost:9092
 schema.registry.url->http://localhost:8081
 default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
 default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

файл конфигурации для 2Процессоры:

type->streamer
number->1
 subtype->average
 variabli->payload:T_DUR_CICLO
 topicSrc->m0-tempi
 topicDest->average
 application.id->stream0
 bootstrap.servers->localhost:9092
 schema.registry.url->http://localhost:8081
 default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
 default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

type->streamer
number->1
 subtype->average
 variabli->payload:HMI_TEMP_E1
 topicSrc->m0-temperature
 topicDest->average
 application.id->stream1
 bootstrap.servers->localhost:9092
 schema.registry.url->http://localhost:8081
 default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
 default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

Теперь я запускаю процессор:

private void loadStreamer (Tuple t){
    int number = Integer.parseInt(t.getNumber());
    for (int i = 0; i < number; i ++) {
        String[] splitted = t.getVariables()[0].split(":");
        Streamer s = new Streamer (t.getSubType(), t.getTopicSrc(), t.getTopicDest(), splitted[0], splitted[1], t.getProp());
        Thread th = new Thread (s);
        th.start();
    }
}

Тип Tuple содержит информацию о файле конфигурации.число в for - это число, присутствующее в файле конфигурации.в этом случае это 1, но я могу сделать больше экземпляра того же процесса для допуска толчков.

Streamer:

    public class Streamer implements Runnable{


    private final String topicSrc;
    private final String topicDest;
    private final String variable;
    private final String pathVariable;
    private final String type;
    private final Properties props;

 public Streamer (String type, String topicSrc, String topicDest, String pathVariable, String variable, Properties props) {
        this.type = type;
        this.topicSrc = topicSrc;
        this.topicDest = topicDest;
        this.variable = variable;
        this.pathVariable = pathVariable;
        this.props = props;
    }

private void average () {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<GenericRecord, GenericRecord> source = 
                builder.stream(this.topicSrc);

        KStream <String, Double> average = source
                .mapValues(value -> createJson(value.toString()))
                .map((key, value) -> KeyValue.pair(this.variable, value.getNumberValue(this.pathVariable, this.variable) + ":" + value.getStringValue("timestamp")))
                .groupByKey( Serialized.with(
                        Serdes.String(),
                        Serdes.String()))
                .aggregate (
                        () -> new Tuple(0, 0, ""),
                        (aggKey, newValue, aggValue) ->  new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue.split(":")[0]), newValue.split(":")[1]),
                        Materialized.with(Serdes.String(), new MySerde()))
                .mapValues((key, value) -> new AverageRecord (key, value.getDate(), value.getAverage()))
                .mapValues(v -> v.getAverage())
                .toStream();

        average.to(this.topicDest, Produced.with(Serdes.String(), Serdes.Double()));
        KafkaStreams stream = new KafkaStreams(builder.build(), props);
        stream.start();
    }

 public void run()  {
        switch (this.type) {
            case "average":
                average();
                break;
            case "filter":
                filter();
                break;
            default:
                System.out.println("type not valid " + this.type);
            break;
    }

Так что у меня есть 2 потока с 2 объектами Streamer, которые исполняютсредняя функция.единственное отличие от 2 streamer - это переменная для вычисления среднего.

Я неправильно создаю процесс?

Ответы [ 3 ]

0 голосов
/ 05 декабря 2018

Добавить отдельную state.dir конфигурацию для каждого потока, а не конфигурацию по умолчанию.Что-то вроде

# stream1
...
state.dir=/tmp/stream1/kafka-stream

# stream2
...
state.dir=/tmp/stream2/kafka-stream
0 голосов
/ 10 июня 2019

Все, что вам нужно, это выполнить new File("/tmp/kafka-streams").mkdirs() перед запуском ваших потоков.На стартере KafkaStreams есть условия гонки.

0 голосов
/ 27 ноября 2018

Похоже, проблема с разрешением.Приложение Kafka Stream создаст каталог состояния, если у него есть разрешение на запись по заданному пути. Каталог

/tmp должен иметь разрешение на запись для пользователя, запускающего приложение.

...