Задание не сериализуемо. Исключение. При использовании искры KeyValueGroupedDataset и каждой группы объединяются в окне. - PullRequest
0 голосов
/ 05 июля 2018

Процесс детализации следующим образом:

  1. загрузка потоковой передачи данных с Kafka
  2. используйте KeyValueGroupedDataset для группировки потоков по атрибуту rule_id
  3. для каждого rule_id, значение KeyValueGroupedDataset mapGroups для набора данных и агрегирование набора данных с использованием SQL и вывод результата на консоль.

При отправке задания возникает исключение «Задача не сериализуема» Метод выполнения задания:

spark = SparkSession.builder().appName("SparkSQLBenchmark").getOrCreate();
StructType schema = new StructType()
    .add("rule_id", "string")
    .add("metric", "string")
    .add("m_value", "double")
    .add("event_time", "timestamp");
// Create DataSet representing the stream of input lines from Kafka
Dataset<Row> metrics = spark.readStream()
    .format("kafka")
    .load()
    .select(from_json(col("value").cast("string"), schema).as("metrics"))
    .select("metrics.*")
    .withWatermark("event_time", "10 seconds");

String[] ruleIds = new String[] {"1", "2"};
// use KeyValueGroupedDataset group stream by rule_id
KeyValueGroupedDataset kvGroupedDataSet = metrics.groupByKey
            (new MapFunction<Row, String>() {
                @Override
                public String call(Row row) throws Exception {
                    return row.getAs("rule_id");
                }
            }, Encoders.STRING());
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
StreamingQuery[] queries = new StreamingQuery[ruleIds.length];
int i = 0;
// for each group, aggregate value use SQL
for(final String id : ruleIds) {
    Dataset<Row> query = kvGroupedDataSet.mapGroups(new MapGroupsFunction<String, Row, Row>() {
            @Override
            public Row call(String key, Iterator<Row> iterator) throws Exception {

                if (id.equals(key)) {
                    while (iterator.hasNext()) {
                        return iterator.next();
                    }
                }
                return RowFactory.create();
            }
        }, encoder);

        query.createOrReplaceTempView("grouped_metrics_" + id);
        final Dataset<Row> result = spark.sql("SELECT WINDOW(event_time, 10 seconds).start AS time_window, " +
            "SUM(m_value) AS sum_value " +
            "FROM grouped_metrics_" + "_" + id + " " +
            "GROUP BY WINDOW(event_time, 10 seconds), metric");

        // output result to console
        StreamingQuery queryResult = result.writeStream()
                .outputMode(OutputMode.Update())
                .format("console")
                .start();

        queries[i] = queryResult;
        i++;
    }

    for (StreamingQuery q : queries) {
        q.awaitTermination();
    }

}

Деталь Исключение: enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...