Как проверить, являются ли пакеты пустыми в потоковой передаче Spark (wordcount с socketTextStream) - PullRequest
0 голосов
/ 09 мая 2019

Я работал над простым примером подсчета слов SparkStreaming, чтобы подсчитать количество слов в текстовых данных, полученных от сервера данных, прослушивающего сокет TCP. Я хотел бы проверить, является ли пакет из потокового источника пустым или нет, прежде чем сохранять содержимое каждого преобразования в текстовые файлы. В настоящее время я использую Spark Shell. Это мой код

Я попробовал этот код, и он отлично работает, не проверяя, пусто ли пакет:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)


lines.saveAsTextFiles("/stream_test/testLine.txt")
val words = lines.flatMap(_.split(" "))
words.saveAsTextFiles("/stream_test/testWords.txt")
val pairs = words.map((_, 1))
pairs.saveAsTextFiles("/stream_test/testPairs.txt")
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveAsTextFiles("/stream_test/testWordsCounts.txt")
wordCounts.print()
ssc.start()

Я пытался использовать foreachRDD, но выдает ошибку error: value saveAsTextFiles is not a member of org.apache.spark.rdd.RDD[String]

Это мой код

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)


lines.foreachRDD(rdd => {
 if(!rdd.partitions.isEmpty)
{
lines.saveAsTextFiles("/stream_test/testLine.txt")

val words = lines.flatMap(_.split(" "))
words.saveAsTextFiles("/stream_test/testWords.txt")

val pairs = words.map((_, 1))
pairs.saveAsTextFiles("/stream_test/testPairs.txt")

val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveAsTextFiles("/stream_test/testWordsCounts.txt")
wordCounts.print()

}
})


ssc.start()

I need to to check if the batch from streaming source is empty or not before I save the content text files. I appreciate your help

1 Ответ

0 голосов
/ 09 мая 2019

Я делал это, используя следующий код.Я зациклю каждый rdd в потоке, а затем использую rdd.count (), чтобы определить, является ли rdd пустым.если все пустые пустые ничего не произошло, надеюсь, это поможет вам.

kafkaStream.foreachRDD(rdd -> {
    if(rdd.count() > 0) {
        // do something
    }
})
...