поиск значений в парном СДП PySpark для ключей, поступающих из другого СДР - PullRequest
1 голос
/ 14 мая 2019

Я новичок в PySpark, я хочу сделать следующее,

Рассмотрим следующий код,

import numpy as np
b =np.array([[1,2,100],[3,4,200],[5,6, 300],[7,8, 400]])
a = np.array([[1,2],[3,4],[11,6],[7,8], [1, 2], [7,8]])
RDDa = sc.parallelize(a)
RDDb = sc.parallelize(b)
dsmRDD = RDDb.map(lambda x: (list(x[:2]), x[2]))

Я хочу получить значения, связанные с каждым значением RDDa, в качестве ключа для dsmRDD, т.е.

result = [100, 200, 0, 400, 100, 400] 

Заранее большое спасибо.

Ответы [ 2 ]

0 голосов
/ 16 мая 2019

Как следует из другого ответа, вы можете преобразовать в фрейм данных и join. Если вы хотите продолжить только с rdd, вы можете сделать это,

import numpy as np

a = np.array([[1,2],[3,4],[11,6],[7,8], [1, 2], [7,8]])
b = np.array([[1,2,100],[3,4,200],[5,6, 300],[7,8, 400]])

RDDa = sc.parallelize(a)
RDDb = sc.parallelize(b)

dsmRDD = RDDa.zipWithIndex()\
         .map(lambda x: (tuple(x[0].tolist()),(0,x[1])))\
         .leftOuterJoin(RDDb.map(lambda x: (tuple(x[:2].tolist()), x[2])))\
         .map(lambda x: (x[1][0][1], x[1][1]) if x[1][1] is not None else (x[1][0][1],x[1][0][0]))

output = map(lambda x:x[1], sorted(dsmRDD.collect()))
print output

, который дает вам вывод,

[100, 200, 0, 400, 100, 400]                                                    
0 голосов
/ 15 мая 2019

Если ваши данные не слишком велики, вы можете использовать такие кадры:

dfa = spark.createDataFrame([[1,2],[3,4],[11,6],[7,8], [1, 2], [7,8]], ["c1", "c2"])
dfb = spark.createDataFrame([[1,2,100],[3,4,200],[5,6, 300],[7,8, 400]], ["c1", "c2", "value"])
>>> dfa.join(dfb, on=["c1","c2"], how="left").na.fill(0).show()
+---+---+-----+
| c1| c2|value|
+---+---+-----+
|  7|  8|  400|
|  7|  8|  400|
| 11|  6|    0|
|  3|  4|  200|
|  1|  2|  100|
|  1|  2|  100|
+---+---+-----+

temp = dfa.join(dfb, on=["c1","c2"], how="left").na.fill(0)

>>> [i.value for i in temp.select("value").collect()]
[400, 400, 0, 200, 100, 100]

...