объединение двух Rdds с несколькими значениями и добавление дополнительного значения на основе объединения в Pyspark? - PullRequest
0 голосов
/ 25 июня 2018

Я создал 2 RDD's, как показано ниже:

rdd1 = sc.parallelize([(u'176', u'244', -0.03925566875021147), (u'28', u'244', 0.9175106515709205), (u'165', u'244', -0.3837580218245722), (u'181', u'244', 0.29145693160561503), (u'161', u'244', -0.503468718448459), (u'28', u'275', 1.1636548589189926), (u'165', u'275', -1.026158464467282), (u'181', u'275', 0.6685791983070568)])

rdd2 = sc.parallelize([(u'176', u'244'), (u'28', u'244'), (u'165', u'244'), (u'165', u'275'), (u'181', u'275'), (u'141', u'388'), (u'154', u'238')])

мой ожидаемый результат должен быть таким, как показано ниже:

[(u'176', u'244', -0.03925566875021147,1), (u'28', u'244', 0.9175106515709205,1), (u'165', u'244', -0.3837580218245722,1), (u'181', u'244', 0.29145693160561503,0), (u'161', u'244', -0.503468718448459,0), (u'28', u'275', 1.1636548589189926,0), (u'165', u'275', -1.026158464467282,1), (u'181', u'275', 0.6685791983070568,1)]

я хочу присоединиться к двум числам добавить статус присоединения, как 1или 0.

в rdd1 1-й кортеж равен (u'176', u'244', -0.03925566875021147), а rdd2 содержит (u'176', u'244'), первые два элемента rdd1, rdd2 совпадают, тогда мой ожидаемый результат равен (u'176', u'244', -0.03925566875021147,1).

то же самое в случае Rdd1: (u'181', u'275', 0.6685791983070568) и Rdd2: (u'181', u'275') вывод будет (u'181', u'275', 0.6685791983070568,1).

, в противном случае rdd1 содержит (u'181', u'244', 0.29145693160561503), но rdd2 не содержит никакого кортежакак (u'181', u'244'), поэтому ожидаемый результат будет (u'181', u'244', 0.29145693160561503,0)

Я добился этого с помощью создания фреймов данных, но я не хочу использовать соединение фреймов данных. Пожалуйста, помогите об этом, как достичь с помощью rdds.

Ответы [ 2 ]

0 голосов
/ 26 июня 2018

Я хочу присоединиться к двум rdds добавить статус присоединения, например 1 или 0

Для присоединения к rdd вам потребуется pairedRdd

pairedRdd1 = rdd1.map(lambda x: ((x[0], x[1]), x[2:]))
pairedRdd2 = rdd2.map(lambda x: ((x[0], x[1]), 1))

Здесь я заполнил 1 в pairedRdd2, поскольку ваше выходное требование должно иметь 1 для сопоставления rdd2 из rdd1.

Затем, наконец, используйте leftOuterJoin и некоторые манипуляции для ожидаемого вывода

finalRdd = pairedRdd1.leftOuterJoin(pairedRdd2).map(lambda x: tuple(list(x[0]) + list(x[1][0]) + [0 if(x[1][1] == None) else 1]))
#[('161', '244', -0.503468718448459, 0),('165', '244', -0.3837580218245722, 1),('181', '244', 0.29145693160561503, 0),('165', '275', -1.026158464467282, 1),('181', '275', 0.6685791983070568, 1),('176', '244', -0.03925566875021147, 1),('28', '275', 1.1636548589189926, 0),('28', '244', 0.9175106515709205, 1)]

Надеюсь, ответ полезен

0 голосов
/ 25 июня 2018

Чтобы сделать это в подходе rdd, вам нужно соединить rdd со столбцами, к которым вы хотите присоединиться, и затем выполнить левое внешнее соединение этого и другого.Для каждого элемента (k, v) в этом результирующее СДР будет содержать либо все пары (k, (v, Some (w))) для w в другом, либо пару (k, (v, None)), если нетэлементы в других имеют ключ к.

 userRDD.leftOuterJoin(empRDD).collect {
        case (String, (firstrddvalue, None)) => (k,v,0)
        case (String, (firstrddvalue,secondrddvalue))=>(k,v,1)
      }
...