Как я могу создать пару СДР из СДР из отдельных элементов в PySpark? - PullRequest
0 голосов
/ 27 апреля 2019

Вот фактический конвейер. Я загружаю текст в RDD. Я тогда убираю это.

rdd1 = sc.textFile("sometext.txt")

import re
import string

def Func(lines):
    lines = lines.lower() #make all text lowercase
    lines = re.sub('[%s]' % re.escape(string.punctuation), '', lines) #remove punctuation
    lines = re.sub('\w*\d\w*', '', lines) #remove numeric-containing strings
    lines = lines.split() #split lines
    return lines
rdd2 = rdd1.flatMap(Func)

stopwords = ['list of stopwords goes here'] 
rdd3 = rdd2.filter(lambda x: x not in stopwords) # filter out stopwords
rdd3.take(5) #resulting RDD

Out:['a',
     'b',
     'c',
     'd',
     'e']

Мне нужно сделать сейчас запуск функции цепочки Маркова. Я хочу связать каждый элемент с его последовательным элементом, таким как:

[('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e') и т. Д. ...]

Ответы [ 2 ]

0 голосов
/ 27 апреля 2019

Я думаю, что вам нужно указать порядок элементов в вашем СДР, чтобы определить, как 2 элемента считаются "последовательными" друг для друга.Поскольку ваш RDD может состоять из нескольких разделов, поэтому spark не будет знать, является ли 1 элемент в partition_1 последовательным по отношению к другому элементу в partition_2.

Если вы заранее знаете свои данные, вы можете определить ключ, а такжекак 2 элемента являются «последовательными».Учитывая ваш пример, где rdd создается из списка, вы можете использовать индекс в качестве ключа и сделать соединение.

"""you want to shift arr by 1 to the left, then join back to arr. Calculation based on index"""

arr = ['a','b','c','d','e','f']
rdd = sc.parallelize(arr, 2).zipWithIndex().cache() #cache if rdd is small 

original_rdd = rdd.map(lambda x: (x[1], x[0])) #create rdd with key=index, value=item in list

shifted_rdd = rdd.map(lambda x: (x[1]-1, x[0]))

results = original_rdd.join(shifted_rdd)
print(results.values().collect())

Чтобы добиться лучшей производительности в join, вы можете использовать разделы диапазона для original_rdd и shifted_rdd.

0 голосов
/ 27 апреля 2019

Подход довольно спуска.Действительно может быть оптимизировано гораздо больше.

>>> rdd=sc.parallelize(['a','b','c','d','e','f'])
#zipping with Index to rip off odd and even elements, to group consecutive elements in future
>>> rdd_odd=rdd.zipWithIndex().filter(lambda (x,y):y%2!=0).map(lambda (x,y):x).coalesce(1)
>>> rdd_even=rdd.zipWithIndex().filter(lambda (x,y):y%2==0).map(lambda (x,y):x).coalesce(1)
>>> rdd_2=rdd_even.zip(rdd_odd)
>>> rdd_2.collect()
[('a', 'b'), ('c', 'd'), ('e', 'f')]

Убедитесь, что четное число элементов в rdd_1.Это фактически станет основой для сопряжения последовательных элементов.

...