Как преобразовать элементы массива в значения столбцов RDD - PullRequest
0 голосов
/ 11 июня 2018

Я готовлюсь к использованию встроенной функции CSV-печати в искровом фрейме данных (не в pandas).У меня уже есть IndexedRowMatrix.Как таковые есть редкие столбцы массива в нем.Преобразование в плотный массив выполняется вызовом карты x.vector.toArray () (ниже).Я не могу понять, как получить элементы плотного массива в отдельные столбцы Spark DataFrame.(Я не представляю панд, пожалуйста.) Как перевести этот RDD в 7-колоночный фрейм данных, состоящий из строкового столбца и шести целочисленных столбцов?Мой код на данный момент:

X = CoordinateMatrix(sc.parallelize(entries)) 
Xirm = X.toIndexedRowMatrix()
colnames = "username," + ','.join(str(cell) for cell in itemids.keys())  # Make CSV header line
# Might need this for speed: Arrow:  https://bryancutler.github.io/createDataFrame/  See above conf=...
XX = Xirm.rows.map(lambda x: (lu[x.index], x.vector.toArray())) # ?
print(XX.take(2))
df = XX.toDF() #TypeError: Can not infer schema for type: <class 'numpy.ndarray'>
#df.write.csv(header=colnames, path=out_filename)

Вот дубль (2), чтобы увидеть пример данных:

[('kygiacomo', array([ 0.,  1.,  0.,  0.,  0.,  0.])), ('namohysip', array([ 1.,  0.,  0.,  0.,  0.,  0.]))]

Смотрите, проблема в том, что кортеж RDD имеет 2 столбца, ноМне нужно 7 столбцов в DataFrame.Количество столбцов определяется динамически, и у меня есть имена столбцов в переменной colnames, но я не знаю, как это передать.Опять же, цель состоит в том, чтобы вывести файл CSV «эквивалент» (многие частичные файлы в порядке) с помощью встроенной функции записи CSV в DAtaFrame.(Spark 2.3.0 является резидентным.) Реалы будут в идеале конвертироваться в целые, без кавычек, окружающих какие-либо значения данных.Но преобразование столбцов из 2 в 7 - действительно сложная проблема.Спасибо за советы.

1 Ответ

0 голосов
/ 11 июня 2018

Простое преобразование в простые типы Python и распаковка должны помочь:

Xirm.rows.map(lambda x: (lu[x.index], *x.vector.toArray().tolist()))

так же, как

Xirm.rows.map(lambda x: [lu[x.index]] + x.vector.toArray().tolist())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...