Я собираюсь рассчитать среднее с потоками кафки.Поэтому я делаю операцию с состоянием, агрегат, который нужно создать в хранилище состояний, но эта доза не происходит.
Здесь функция для среднего:
private void average () {
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(this.topicSrc);
KStream <String, Double> average = source
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(this.variable, value.getNumberValue(this.pathVariable, this.variable)))
.groupByKey( Serialized.with(
Serdes.String(),
Serdes.String()))
.aggregate (
() -> new Tuple(0, 0),
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
Materialized.with(Serdes.String(), new MySerde()))
.mapValues(v -> v.getAverage())
.toStream();
average.to(this.topicDest, Produced.with(Serdes.String(), Serdes.Double()));
KafkaStreams stream = new KafkaStreams(builder.build(), props);
stream.start();
}
вот исключение:
Exception in thread "Thread-0" org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.streams.errors.ProcessorStateException: base state directory [/tmp/kafka-streams] doesn't exist and couldn't be created
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:658)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:628)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:538)
at it.imolinfo.sacmi.processor.Streamer.average(Streamer.java:167)
at it.imolinfo.sacmi.processor.Streamer.run(Streamer.java:180)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: base state directory [/tmp/kafka-streams] doesn't exist and couldn't be created
at org.apache.kafka.streams.processor.internals.StateDirectory.<init>(StateDirectory.java:80)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:656)
... 5 more
Проблема в том, что доза не существует в базовом каталоге, но я хочу, чтобы поток kafka создал каталог в случае необходимости.
--- EDIT ----- Я заметил, чтоесли у меня есть 1 процессор, который составляет среднее значение для переменной, проблем нет, но если у меня есть 2 процессора, то да.
Файл конфигурации для 1 процессора:
type->streamer
number->1
subtype->average
variabli->payload:T_DUR_CICLO
topicSrc->m0-tempi
topicDest->average
application.id->stream0
bootstrap.servers->localhost:9092
schema.registry.url->http://localhost:8081
default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
файл конфигурации для 2Процессоры:
type->streamer
number->1
subtype->average
variabli->payload:T_DUR_CICLO
topicSrc->m0-tempi
topicDest->average
application.id->stream0
bootstrap.servers->localhost:9092
schema.registry.url->http://localhost:8081
default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
type->streamer
number->1
subtype->average
variabli->payload:HMI_TEMP_E1
topicSrc->m0-temperature
topicDest->average
application.id->stream1
bootstrap.servers->localhost:9092
schema.registry.url->http://localhost:8081
default.key.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
default.value.serde->io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
Теперь я запускаю процессор:
private void loadStreamer (Tuple t){
int number = Integer.parseInt(t.getNumber());
for (int i = 0; i < number; i ++) {
String[] splitted = t.getVariables()[0].split(":");
Streamer s = new Streamer (t.getSubType(), t.getTopicSrc(), t.getTopicDest(), splitted[0], splitted[1], t.getProp());
Thread th = new Thread (s);
th.start();
}
}
Тип Tuple содержит информацию о файле конфигурации.число в for - это число, присутствующее в файле конфигурации.в этом случае это 1, но я могу сделать больше экземпляра того же процесса для допуска толчков.
Streamer:
public class Streamer implements Runnable{
private final String topicSrc;
private final String topicDest;
private final String variable;
private final String pathVariable;
private final String type;
private final Properties props;
public Streamer (String type, String topicSrc, String topicDest, String pathVariable, String variable, Properties props) {
this.type = type;
this.topicSrc = topicSrc;
this.topicDest = topicDest;
this.variable = variable;
this.pathVariable = pathVariable;
this.props = props;
}
private void average () {
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(this.topicSrc);
KStream <String, Double> average = source
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(this.variable, value.getNumberValue(this.pathVariable, this.variable) + ":" + value.getStringValue("timestamp")))
.groupByKey( Serialized.with(
Serdes.String(),
Serdes.String()))
.aggregate (
() -> new Tuple(0, 0, ""),
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue.split(":")[0]), newValue.split(":")[1]),
Materialized.with(Serdes.String(), new MySerde()))
.mapValues((key, value) -> new AverageRecord (key, value.getDate(), value.getAverage()))
.mapValues(v -> v.getAverage())
.toStream();
average.to(this.topicDest, Produced.with(Serdes.String(), Serdes.Double()));
KafkaStreams stream = new KafkaStreams(builder.build(), props);
stream.start();
}
public void run() {
switch (this.type) {
case "average":
average();
break;
case "filter":
filter();
break;
default:
System.out.println("type not valid " + this.type);
break;
}
Так что у меня есть 2 потока с 2 объектами Streamer, которые исполняютсредняя функция.единственное отличие от 2 streamer - это переменная для вычисления среднего.
Я неправильно создаю процесс?