искры потоковой передачи отдельных данных перед операцией окна? - PullRequest
0 голосов
/ 10 апреля 2020

Я передаю данные в потоковом режиме следующим образом:

d = [{"member_id": "123", "type": "a", "pv_id": "b"},
     {"member_id": "123", "type": "a", "pv_id": "b"},
     {"member_id": "123", "type": "c", "pv_id": "d"},
     {"member_id": "123", "type": "c", "pv_id": "d"},
     {"member_id": "234", "type": "a", "pv_id": "b"},
     {"member_id": "234", "type": "a", "pv_id": "b"}]

Я хочу выполнять оконную операцию для подсчета частоты member_id каждые 5 секунд. Прежде чем считать частоту, я должен сделать отдельную операцию, потому что некоторые записи одинаковы. Например, d [0] идентичны d [1]. Ниже мой код.

object WordCount {

  def processData(record: (String, String)): (String, String, String) = {
    val value = record._2
    val json = new JsonParser()
    val obj = json.parse(value).asInstanceOf[JsonObject]
    val memberId = obj.get("member_id").toString.replace("\"", "")
    val _type = obj.get("type").toString.replace("\"", "")
    val pvId = obj.get("pv_id").toString.replace("\"", "")
    Tuple3(memberId, _type, pvId)

  }

  def flatData(record: (String, String, String), value: Int): Iterator[java.lang.String] = {
    List(record._1.toString, record._2.toString, record._3.toString).iterator
  }

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("DirectKafkaWordCount")
      .set("spark.streaming.blockInterval", "200ms")
      .set("spark.python.worker.memory", "1g")
      .set("spark.master", "local[*]")
      .set("spark.streaming.receiver.maxRate", "5000")
      .set("redis.host", "127.0.0.1")
      .set("redis.port", "6379")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")

    val messages = KafkaUtils.createStream(
      ssc,
      "127.0.0.1:2181",
      "hsc_spark",
      Map("test" -> 1)
    )

    val lines = messages.map(processData)
    val wordCounts = lines.map(x => (x, 1))
      .reduceByKey((_: Int, _: Int) => 1)
      .flatMap(flatData) // some thing went wrong here
      .map(x => (x, 1)).reduceByKeyAndWindow(Seconds(5), Seconds(5))

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Ошибка вставлена. Но разве Итератор не является подтипом TraversableOnce?

[ERROR] /Users/hushichang/SlidingWindow/src/main/java/Hello.scala:112: error: type mismatch;
[INFO]  found   : ((String, String, String), Int) => Iterator[String]
[INFO]  required: (((String, String, String), Int)) => TraversableOnce[?]
[INFO]       .flatMap(flatData) // some thing went wrong here
[INFO]                ^
[ERROR] one error found
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...