Я передаю данные в потоковом режиме следующим образом:
d = [{"member_id": "123", "type": "a", "pv_id": "b"},
{"member_id": "123", "type": "a", "pv_id": "b"},
{"member_id": "123", "type": "c", "pv_id": "d"},
{"member_id": "123", "type": "c", "pv_id": "d"},
{"member_id": "234", "type": "a", "pv_id": "b"},
{"member_id": "234", "type": "a", "pv_id": "b"}]
Я хочу выполнять оконную операцию для подсчета частоты member_id каждые 5 секунд. Прежде чем считать частоту, я должен сделать отдельную операцию, потому что некоторые записи одинаковы. Например, d [0] идентичны d [1]. Ниже мой код.
object WordCount {
def processData(record: (String, String)): (String, String, String) = {
val value = record._2
val json = new JsonParser()
val obj = json.parse(value).asInstanceOf[JsonObject]
val memberId = obj.get("member_id").toString.replace("\"", "")
val _type = obj.get("type").toString.replace("\"", "")
val pvId = obj.get("pv_id").toString.replace("\"", "")
Tuple3(memberId, _type, pvId)
}
def flatData(record: (String, String, String), value: Int): Iterator[java.lang.String] = {
List(record._1.toString, record._2.toString, record._3.toString).iterator
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("DirectKafkaWordCount")
.set("spark.streaming.blockInterval", "200ms")
.set("spark.python.worker.memory", "1g")
.set("spark.master", "local[*]")
.set("spark.streaming.receiver.maxRate", "5000")
.set("redis.host", "127.0.0.1")
.set("redis.port", "6379")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val messages = KafkaUtils.createStream(
ssc,
"127.0.0.1:2181",
"hsc_spark",
Map("test" -> 1)
)
val lines = messages.map(processData)
val wordCounts = lines.map(x => (x, 1))
.reduceByKey((_: Int, _: Int) => 1)
.flatMap(flatData) // some thing went wrong here
.map(x => (x, 1)).reduceByKeyAndWindow(Seconds(5), Seconds(5))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Ошибка вставлена. Но разве Итератор не является подтипом TraversableOnce?
[ERROR] /Users/hushichang/SlidingWindow/src/main/java/Hello.scala:112: error: type mismatch;
[INFO] found : ((String, String, String), Int) => Iterator[String]
[INFO] required: (((String, String, String), Int)) => TraversableOnce[?]
[INFO] .flatMap(flatData) // some thing went wrong here
[INFO] ^
[ERROR] one error found