Потеря данных, когда я выполнил timeWindow со стандартным потоком в Flink - PullRequest
0 голосов
/ 11 февраля 2019

Я пытаюсь вставить данные датчика в базу данных притока.Я успешно выполнил описанный выше сценарий и вижу, что мои данные, поступающие с моей консоли производителя kafka, хранятся в базе данных притока.

Следующим шагом является анализ определенных данных в окне.Для этого я просто добавил функцию timeWindow и преобразовал необработанные данные в кортеж, чтобы увидеть, правильно ли сгруппированы данные в окне, основанном на времени.

Когда я распечатывал значения для каждого окна,это сработало хорошо.Тем не менее, некоторые необработанные данные отсутствуют, когда я вижу приток дБ по сравнению с моим первым сценарием.

Ниже мой код.Входная строка исходит от производителя kafka, и когда я быстро набрал значение, пропавшее значение уже существует.Однако, когда я набираю значение очень медленно (например, 1 значение в 1 секунду), пропущенное значение отсутствует.

Является ли эта характеристика мерцания просто моей ошибкой на уровне кода?

Вся среда тестируется на локальной машине.

package org.apache.flink.streaming.connectors.influxdb;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.functions.windowing.*;
import org.apache.flink.util.Collector;


import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;

public class ReadFromKafka {
    public static void main(String[] args) throws Exception{
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink");

        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer09<>("flinkStreaming3", new SimpleStringSchema(), properties));

//        DataStream<String> stream2 = env
//                .addSource(new FlinkKafkaConsumer09<>("flinkStreaming3", new SimpleStringSchema(), properties));

        env.enableCheckpointing(1000);

        DataStream<Tuple2<String,Integer>> process = stream.timeWindowAll(Time.seconds(5)).process(new ProcessAllWindowFunction<String, Tuple2<String,Integer>, TimeWindow>() {
            @Override
            public void process(Context context, Iterable<String> iterable, Collector<Tuple2<String, Integer>> out) throws Exception {
                int cnt = 0;
                for (String in : iterable){
                    out.collect(new Tuple2<String,Integer>(in,cnt++));
                }
            }
        });


        process.print();

        DataStream<InfluxDBPoint> dataStream = stream.map(new MapFunction<String, InfluxDBPoint>() {
                    @Override
                    public InfluxDBPoint map(String s) throws Exception {
                        String sensorVal = s;
                        String measurement = "data";
                        long timestamp = System.currentTimeMillis();
                        HashMap<String, String> tags = new HashMap<>();
                        tags.put("host", String.valueOf(measurement.hashCode() % 20));
                        HashMap<String, Object> fields = new HashMap<>();
                        fields.put("value", Double.parseDouble(sensorVal));
                        return new InfluxDBPoint(measurement, timestamp, tags, fields);
                    }
                });


        InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test")
                .batchActions(1000)
                .flushDuration(10, TimeUnit.MILLISECONDS)
                .enableGzip(true)
                .build();

//        processStream.addSink(new InfluxDBSink(influxDBConfig));
        dataStream.addSink(new InfluxDBSink(influxDBConfig));

        env.execute("InfluxDB Sink Example");

        env.execute();
    }
}

Спасибо.

...