Как сохранить временное окно в Flink в текстовый файл? - PullRequest
0 голосов
/ 08 ноября 2019

Я начинаю работать в ApacheFlink в Java.

Моя цель - использовать тему ApacheKafka в одноминутном временном окне, которая будет применять основную информацию и записывать результаты каждого окна в файл. .

Пока что мне удалось применить упрощение преобразования текста к тому, что я получаю, я должен использовать apply или process, чтобы записать файл, результат окна я несколько потерян.

Этомой код до сих пор

package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.time.ZoneId;
import java.util.Date;
import java.util.Properties;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.akka.org.jboss.netty.channel.ExceptionEvent;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import scala.util.parsing.json.JSONObject;
public class BatchJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment  env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("auto.offset.reset", "latest");
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("topic-basic-test", new SimpleStringSchema(), properties);
        DataStream<String> data = env.addSource(consumer);
        data.flatMap(new JSONparse()).timeWindowAll(Time.minutes(1))."NEXT ??" .print()
        System.out.println("Hola usuario 2");
        env.execute("Flink Batch Java API Skeleton");
    }
    public static class JSONparse implements FlatMapFunction<String, Tuple2<String, String>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception {
            System.out.println(s);
            s = s + "ACA PODES JUGAR NDEAH";
            collector.collect(new Tuple2<String,String>("M",s));
        }
    }
}

1 Ответ

0 голосов
/ 11 ноября 2019

Если вы хотите, чтобы результат каждого окна продолжительностью в одну минуту переходил в свой собственный файл, вы можете посмотреть на использование StreamingFileSink с одной минутой - что должно делать то, что вы ищете,или очень близко.

Я думаю, что на самом деле вы получите каталог для каждого окна, содержащий файл из каждого параллельного экземпляра окна - но, как вы используете timeWindowAll, который не работает впараллельно, в каждом сегменте будет только один файл, если только результаты не настолько велики, что файл переворачивается.

Кстати, выполнение анализа JSON в FlatMap будет работать довольно плохо, потому что это приведет к созданию экземплярановый парсер для каждого события, что в свою очередь вызовет значительную активность GC. Было бы лучше использовать RichFlatMap и создать один синтаксический анализатор в методе open (), который вы можете использовать для каждого события. И что еще лучше, используйте JSONKeyValueDeserializationSchema, а не SimpleStringSchema, и пусть разъем kafka будет обрабатывать для вас разбор json.

...