Как выполнить фильтрацию по значению (кортежам) в паре RDD с точки зрения ключа - PullRequest
0 голосов
/ 08 марта 2019

Образец СДР выглядит следующим образом:

(key1,(111,222,1)
(key1,(113,224,1)
(key1,(114,225,0)
(key1,(115,226,0)   
(key1,(113,226,0)    
(key1,(116,227,1)    
(key1,(117,228,1)   
(key2,(118,229,1)

Я сейчас работаю над проектом искры. Я хочу отфильтровать первый и последний элементы, где третья позиция в значениях кортежа равна '1' и '0' на основе ключей. Возможно ли это сделать с помощью ReduceByKey? Но после моих исследований я не нашел хорошей логики для достижения того, чего я хочу. Я хочу, чтобы мой результат был в том же порядке, что и результат, показанный ниже.

Ожидаемый результат:

(key1,(111,222,1)
(key1,(114,225,0)
(key1,(113,226,0)
(key1,(116,227,1)
(key2,(118,229,1)

Очень ценится.

1 Ответ

0 голосов
/ 08 марта 2019

Если я правильно понимаю, вы хотите первую «1», первую «0», последнюю «1» и последнюю «0» для каждого ключа и поддерживать порядок.На вашем месте я бы использовал SparkSQL API для этого.

Во-первых, давайте создадим ваш RDD (кстати, предоставить пример данных очень приятно, предоставив достаточно кода, чтобы мы могли лучше воспроизвести то, что вы сделали):

val seq = Seq(("key1",(111,222,1)),
    ("key1",(113,224,1)),
    ("key1",(114,225,0)),
    ("key1",(115,226,0)),   
    ("key1",(113,226,0)),    
    ("key1",(116,227,1)),    
    ("key1",(117,228,1)),   
    ("key2",(118,229,1)))

val rdd = sc.parallelize(seq)

// then I switch to dataframes, and add an id to be able to go back to 
// the previous order
val df = rdd.toDF("key", "value").withColumn("id", monotonicallyIncreasingId)
df.show()
+----+-----------+------------+
| key|      value|          id|
+----+-----------+------------+
|key1|[111,222,1]|  8589934592|
|key1|[113,224,1]| 25769803776|
|key1|[114,225,0]| 42949672960|
|key1|[115,226,0]| 60129542144|
|key1|[113,226,0]| 77309411328|
|key1|[116,227,1]| 94489280512|
|key1|[117,228,1]|111669149696|
|key2|[118,229,1]|128849018880|
+----+-----------+------------+

Теперь, мы можем сгруппировать по «ключу» и «значению ._3», сохранить min (id) и его максимум и взорвать данные обратно.С окном, однако, мы можем сделать это более простым способом.Давайте определим следующее окно:

val win = Window.partitionBy("key", "value._3").orderBy("id")
// now we compute the previous and next element of each id using resp. lag and lead
val big_df = df
    .withColumn("lag", lag('id, 1) over win)
    .withColumn("lead", lead('id, 1) over win)
big_df.show
+----+-----------+------------+-----------+------------+
| key|      value|          id|        lag|        lead|
+----+-----------+------------+-----------+------------+
|key1|[111,222,1]|  8589934592|       null| 25769803776|
|key1|[113,224,1]| 25769803776| 8589934592| 94489280512|
|key1|[116,227,1]| 94489280512|25769803776|111669149696|
|key1|[117,228,1]|111669149696|94489280512|        null|
|key1|[114,225,0]| 42949672960|       null| 60129542144|
|key1|[115,226,0]| 60129542144|42949672960| 77309411328|
|key1|[113,226,0]| 77309411328|60129542144|        null|
|key2|[118,229,1]|128849018880|       null|        null|
+----+-----------+------------+-----------+------------+

Теперь мы видим, что строки, за которыми вы работаете, это те, у которых задержка равна нулю (первый элемент) или опережение равно нулю (последний элемент).Поэтому давайте отфильтруем, отсортируем по предыдущему порядку с помощью идентификатора и выберите нужные столбцы:

val result = big_df
    .where(('lag isNull) || ('lead isNull))
    .orderBy('id)
    .select("key", "value")
result.show
+----+-----------+
| key|      value|
+----+-----------+
|key1|[111,222,1]|
|key1|[114,225,0]|
|key1|[113,226,0]|
|key1|[117,228,1]|
|key2|[118,229,1]|
+----+-----------+

Наконец, если вам действительно нужен СДР, вы можете преобразовать кадр данных с помощью:

result.rdd.map(row => row.getAs[String](0) -> row.getAs[(Int, Int, Int)](1))
...