искровые потоковые фреймы и аккумуляторы на java - PullRequest
0 голосов
/ 17 июня 2020

Я обрабатываю поток kafka JSON в Spark Structured Streaming. Обработка как микропакеты, могу ли я использовать аккумуляторы с потоковыми фреймами данных?

LongAccumulator longAccum = new LongAccumulator("my accum");

Dataset<Row> df2 = df.filter(output.col("Called number").equalTo("0860"))
            .groupBy("Calling number").count();
// put row counter to accumulator for example
df2.javaRDD().foreach(row -> {longAccumulator.add(1);})

throws

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

. Также меня смущает использование аккумуляторов таким образом. Преобразование фрейма данных в RDD выглядит странно и излишне. Могу ли я сделать это без c RDD и foreach ()?

Согласно исключению, я удалил foreach из исходного фрейма данных и сделал это в writeStream (). ForeachBatch ()

        StreamingQuery ds = df2
            .writeStream().foreachBatch( (rowDataset, aLong) -> {
                longAccum.add(1);
                log.info("accum : " + longAccum.value());
            })
            .outputMode("complete")
            .format("console").start();

Он работает, но у меня нет значений в журналах, и я не вижу аккумулятор в GUI.

1 Ответ

1 голос
/ 17 июня 2020

Нет, вы можете получить доступ напрямую, используя набор данных, как показано ниже -

 LongAccumulator longAccum = spark.sparkContext().longAccumulator("my accum");


        Dataset<Row> df = spark.range(100).withColumn("x", lit("x"));

        //access in map
        df.map((MapFunction<Row, Row>) row -> {
            longAccum.add(1);
            return  row;
        }, RowEncoder.apply(df.schema()))
                .count();

        // accumulator value
        System.out.println(longAccum.value()); // 100

        longAccum.reset();
        // access in for each
        df.foreach((ForeachFunction<Row>) row -> longAccum.add(1));

        // accumulator value
        System.out.println(longAccum.value()); // 100

Обратите внимание, что значение аккумулятора обновляется только при выполнении action.

Использование Streaming dataframe

 longAccum.reset();
        /**
         * streaming dataframe from csv dir
         * test.csv
         * --------
         * csv
         * id,name
         * 1,bob
         * 2,smith
         * 3,jam
         * 4,dwayne
         * 5,mike
         */
        String fileDir = getClass().getResource("/" + "csv").getPath();
        StructType schema = new StructType()
                .add(new StructField("id", DataTypes.LongType, true, Metadata.empty()))
                .add(new StructField("name", DataTypes.StringType, true, Metadata.empty()));
        Dataset<Row> json = spark.readStream().schema(schema).option("header", true).csv(fileDir);

        StreamingQuery streamingQuery = json
                .map((MapFunction<Row, Row>) row -> {
                    longAccum.add(1);
                    return row;
                }, RowEncoder.apply(df.schema()))
                .writeStream()
                .format("console").start();
        streamingQuery.processAllAvailable();

        // accumulator value
        System.out.println(longAccum.value()); // 5
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...