Я пытаюсь вставить данные датчика в базу данных притока.Я успешно выполнил описанный выше сценарий и вижу, что мои данные, поступающие с моей консоли производителя kafka, хранятся в базе данных притока.
Следующим шагом является анализ определенных данных в окне.Для этого я просто добавил функцию timeWindow
и преобразовал необработанные данные в кортеж, чтобы увидеть, правильно ли сгруппированы данные в окне, основанном на времени.
Когда я распечатывал значения для каждого окна,это сработало хорошо.Тем не менее, некоторые необработанные данные отсутствуют, когда я вижу приток дБ по сравнению с моим первым сценарием.
Ниже мой код.Входная строка исходит от производителя kafka, и когда я быстро набрал значение, пропавшее значение уже существует.Однако, когда я набираю значение очень медленно (например, 1 значение в 1 секунду), пропущенное значение отсутствует.
Является ли эта характеристика мерцания просто моей ошибкой на уровне кода?
Вся среда тестируется на локальной машине.
package org.apache.flink.streaming.connectors.influxdb;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.functions.windowing.*;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class ReadFromKafka {
public static void main(String[] args) throws Exception{
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<>("flinkStreaming3", new SimpleStringSchema(), properties));
// DataStream<String> stream2 = env
// .addSource(new FlinkKafkaConsumer09<>("flinkStreaming3", new SimpleStringSchema(), properties));
env.enableCheckpointing(1000);
DataStream<Tuple2<String,Integer>> process = stream.timeWindowAll(Time.seconds(5)).process(new ProcessAllWindowFunction<String, Tuple2<String,Integer>, TimeWindow>() {
@Override
public void process(Context context, Iterable<String> iterable, Collector<Tuple2<String, Integer>> out) throws Exception {
int cnt = 0;
for (String in : iterable){
out.collect(new Tuple2<String,Integer>(in,cnt++));
}
}
});
process.print();
DataStream<InfluxDBPoint> dataStream = stream.map(new MapFunction<String, InfluxDBPoint>() {
@Override
public InfluxDBPoint map(String s) throws Exception {
String sensorVal = s;
String measurement = "data";
long timestamp = System.currentTimeMillis();
HashMap<String, String> tags = new HashMap<>();
tags.put("host", String.valueOf(measurement.hashCode() % 20));
HashMap<String, Object> fields = new HashMap<>();
fields.put("value", Double.parseDouble(sensorVal));
return new InfluxDBPoint(measurement, timestamp, tags, fields);
}
});
InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test")
.batchActions(1000)
.flushDuration(10, TimeUnit.MILLISECONDS)
.enableGzip(true)
.build();
// processStream.addSink(new InfluxDBSink(influxDBConfig));
dataStream.addSink(new InfluxDBSink(influxDBConfig));
env.execute("InfluxDB Sink Example");
env.execute();
}
}
Спасибо.