Как получить самую раннюю дату из RDD [String, List [java.sql.date]], Scala - PullRequest
0 голосов
/ 30 сентября 2018

У меня есть приведенный ниже RDD, t1RDD2, присутствуют только первые пять строк:

(000471242-01,CompactBuffer(2012-05-07, 2006-11-15, 2014-10-08, 2010-05-20))
(996006688-01,CompactBuffer(2011-01-18, 2005-08-19, 2008-08-27, 2014-09-05, 2006-06-26, 2012-05-10, 2013-11-22, 2005-10-14, 2007-03-26, 2007-05-17, 2010-05-19, 2008-07-11, 2009-03-09))
(788000995-01,CompactBuffer(2006-01-06, 2013-05-01))
(525570000-01,CompactBuffer(2009-07-06, 2010-06-10, 2013-01-22, 2005-03-09, 2008-06-09, 2008-11-07))
(418500000-01,CompactBuffer(2007-07-09, 2011-02-16, 2012-10-16, 2005-10-18, 2009-05-11, 2008-01-22, 2014-07-08, 2010-01-04, 2009-03-23, 2013-08-16))

Я пытаюсь получить самую раннюю дату из буфера, но получаю ошибку из своего кода.

Код:

val t1RDD = t1RDD2.reduceByKey((date1, date2) => if (date1.before(date2)) date1 else date2)

Ошибка:

value before is not a member of Iterable[java.sql.Date]

Есть предложения?

1 Ответ

0 голосов
/ 30 сентября 2018

Очевидно, ваш t1RDD2 эквивалентен результату groupByKey на PairRDD следующим образом (с урезанными выборочными данными):

import java.sql.Date

val rdd = sc.parallelize(Seq(
  ("000471242-01", Date.valueOf("2012-05-07")),
  ("000471242-01", Date.valueOf("2006-11-15")),
  ("996006688-01", Date.valueOf("2011-01-18")),
  ("996006688-01", Date.valueOf("2005-08-19")),
  ("996006688-01", Date.valueOf("2008-08-27"))
))

val t1RDD2 = rdd.groupByKey
// t1RDD2: org.apache.spark.rdd.RDD[(String, Iterable[java.sql.Date])] = ...

t1RDD2.collect
// res1: Array[(String, Iterable[java.sql.Date])] = Array(
//   (996006688-01,CompactBuffer(2011-01-18, 2005-08-19, 2008-08-27)),
//   (000471242-01,CompactBuffer(2012-05-07, 2006-11-15))
// )

Если вы хотите получить самую раннюю датудля каждого ключа от t1RDD2 используйте map до reduce столбец значений для минимального значения:

t1RDD2.map{ case (k, v) => ( k, v.reduce((min, d) => if (min.before(d)) min else d) ) }.
  collect
// res2: Array[(String, java.sql.Date)] = Array((996006688-01,2005-08-19), (000471242-01,2006-11-15))

Но было бы лучше напрямую выполнить reduceByKey из предварительно сгруппированного СДР,если применимо:

rdd.reduceByKey( (min, d) => if (min.before(d)) min else d ).
  collect
// res3: Array[(String, java.sql.Date)] = Array((996006688-01,2005-08-19), (000471242-01,2006-11-15))
...