Как записать данные в hdfs используя flink без прерывания? - PullRequest
0 голосов
/ 31 января 2019

Я использую Flink и пытаюсь сохранить данные в HDFS.Сценарий таков: сначала данные поступают с кафки.Затем flink получает данные от kafka, а затем записывает данные в hdfs.Тем не менее, я могу идентифицировать данные только в формате hdf, когда я закрыл приложение flink.Что я хочу сделать, это идентифицировать данные, не прерывая приложение flink.

Ниже приведен мой код flink.Это очень просто.Есть ли какая-либо конфигурация, чтобы я мог сохранять данные в формате hdf на основе временного интервала, не прерывая приложение flink?

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.util.Properties;
public class ReadFromKafka {

  public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "flink_consumer");


    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties));

    DataStream<String> output = stream.map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;

      @Override
      public String map(String value) throws Exception {
        return "Stream Value: " + value;
      }
    });

    RollingSink sink = new RollingSink<String>("/user/sclee/flink/stream");
    sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
    sink.setWriter(new StringWriter());
    sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
    output.addSink(sink);

    env.execute();
  }


}
...