Spark Streaming: запись количества строк, прочитанных из темы Кафки - PullRequest
0 голосов
/ 09 ноября 2018

Работа Spark Streaming - чтение событий из занятой темы кафки. Чтобы понять, сколько данных поступает за триггерный интервал, я хочу просто вывести количество строк, прочитанных из темы. Я попробовал несколько способов сделать это, но не смог понять.

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", topic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
//          .option("failOnDataLoss", false)
//          .option("maxOffsetsPerTrigger", 10000)
          .load();
      stream.selectExpr("topic").agg(count("topic")).as("count");
      //stream.selectExpr("topic").groupBy("topic").agg(count(col("topic")).as("count"));
      stream.writeStream()
            .format("console")
            .option("truncate", false)
            .trigger(Trigger.ProcessingTime("10 seconds"))
            .start();

1 Ответ

0 голосов
/ 09 ноября 2018

Похоже, вам нужно

stream = stream.selectExpr("topic").agg(count("topic")).as("count");

И тогда вы можете распечатать это

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...