спарк рдд: группировка и фильтрация - PullRequest
0 голосов
/ 22 февраля 2019

У меня есть Rdd "labResults" объектов:

case class LabResult(patientID: String, date: Long, labName: String, value: String)

Я хочу преобразовать этот rdd так, чтобы он включал только одну строку для каждой комбинации PatientID и LabName.Эта строка должна быть самой последней строкой для этой комбинации PatientID и labName (меня интересует только последняя дата, когда у пациента была эта лаборатория).Я делаю это следующим образом:

//group rows by patient and lab and take only the last one
val cleanLab = labResults.groupBy(x => (x.patientID, x.labName)).map(_._2).map { events =>
  val latest_date = events.maxBy(_.date)
  val lab = events.filter(x=> x.date == latest_date)
  lab.take(1)
}

Поздно я хочу создать ребра из этого СДР:

val edgePatientLab: RDD[Edge[EdgeProperty]] = cleanLab
  .map({ lab =>
    Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty])
  })

и получаю ошибку:

value patientID is not a member of Iterable[edu.gatech.cse6250.model.LabResult]

[ошибка] Edge (lab.patientID.toLong, lab2VertexId (lab.labName), PatientLabEdgeProperty (lab) .asInstanceOf [EdgeProperty]) [ошибка] ^ [ошибка] / hw4 / stu_code / src / main / scala / edu /gatech / cse6250 / graphconstruct / GraphLoader.scala: 94: 53: значение labName не является членом Iterable [edu.gatech.cse6250.model.LabResult] [ошибка] Edge (lab.patientID.toLong, lab2VertexId (lab.labName), PatientLabEdgeProperty (lab) .asInstanceOf [EdgeProperty]) [ошибка] ^ [ошибка] /hw4/stu_code/src/main/scala/edu/gatech/cse6250/graphconstruct/GraphLoader.scala:94:86: несоответствие типов;[ошибка] найдена: Iterable [edu.gatech.cse6250.model.LabResult] [ошибка] требуется: edu.gatech.cse6250.model.LabResult [ошибка] Edge (lab.patientID.toLong, lab2VertexId (lab.labName), PatientLabEdgeProperty(lab) .asInstanceOf [EdgeProperty])

Итак, похоже, проблема в том, что «cleanLab» не является ни СДР LabResult, как я ожидал, но СДР Iterable [edu.gatech.cse6250.model.LabResult]

Как я могу это исправить?

1 Ответ

0 голосов
/ 22 февраля 2019

Вот мой подход к первой части.Материал о Edge и тех других классах, которым я не могу помочь, поскольку я не знаю, откуда они (это из здесь ?)

scala> val ds = List(("1", 1, "A", "value 1"), ("1", 3, "A", "value 3"), ("1", 3, "B", "value 3"), ("1", 2, "A", "value 2"), ("1", 3, "B", "value 3"), ("1", 5, "B", "value 5") ).toDF("patientID", "date", "labName", "value").as[LabResult]
ds: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, date: int ... 2 more fields]

scala> ds.show
+---------+----+-------+-------+
|patientID|date|labName|  value|
+---------+----+-------+-------+
|        1|   1|      A|value 1|
|        1|   3|      A|value 3|
|        1|   3|      B|value 3|
|        1|   2|      A|value 2|
|        1|   3|      B|value 3|
|        1|   5|      B|value 5|
+---------+----+-------+-------+


scala> val grouped = ds.groupBy("patientID", "labName").agg(max("date") as "date")
grouped: org.apache.spark.sql.DataFrame = [patientID: string, labName: string ... 1 more field]

scala> grouped.show
+---------+-------+----+
|patientID|labName|date|
+---------+-------+----+
|        1|      A|   3|
|        1|      B|   5|
+---------+-------+----+


scala> val cleanLab = ds.join(grouped, Seq("patientID", "labName", "date")).as[LabResult]
cleanLab: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, labName: string ... 2 more fields]

scala> cleanLab.show
+---------+-------+----+-------+
|patientID|labName|date|  value|
+---------+-------+----+-------+
|        1|      A|   3|value 3|
|        1|      B|   5|value 5|
+---------+-------+----+-------+


scala> cleanLab.head
res45: LabResult = LabResult(1,3,A,value 3)

scala>
...