Pypark - преобразование двоичного файла в DataFrame, но для всех полей возвращается ноль - PullRequest
0 голосов
/ 12 декабря 2018

Я пытаюсь преобразовать двоичный файл в значения ascii и сохранить его в кадре данных.Преобразование в ASCII работает нормально.Но когда я пытаюсь преобразовать в Spark Dataframe, я получаю только нулевые значения для всех полей.Не уверен насчет отсутствующей части.

Возвращенный df должен быть dF панды, но отображается как список.

Двоичный файл содержит 2 записи фиксированного размера 16 байтов.Входные значения выглядят так:

01 01 02 0F FF E3 33 52 14 75 26 58 87 7F FF FF 01 01 02 0D FF E3 33 52 14 75 26 58 87 7F FF FF

Пожалуйста, помогите решить.Ниже приведен код и вывод.

%spark2.pyspark

from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

import binascii
import pandas as pd
import numpy as np
import datetime
from string import printable

recordsize = 16
chunkcount = 100
chunksize = recordsize * chunkcount

sparkSchema = StructType([
    StructField ("Field1", IntegerType(), True),
    StructField ("Field2", StringType(), True),
    StructField ("Field3", StringType(), True),
    StructField ("Field4", StringType(), True)

])

dt = np.dtype([
        ('Field1', 'b'),
        ('Field2', np.void, 4),
        ('Field3', np.void, 3),
        ('Field4', np.void, 8)

    ])

StartTime = datetime.datetime.now()
print ("Start Time: " + str(StartTime))

inputfile = "/user/maria_dev/binfiles1"


def decodeRecord(data):

    x = np.frombuffer (data[1], dtype=dt)

    newx = x.byteswap().newbyteorder()

    df = pd.DataFrame(newx)

    st = set(printable)

    df[['Field2', 'Field3', 'Field4']] = df[['Field2', 'Field3', 'Field4']].applymap(
                                                            lambda x: binascii.hexlify(x).decode('utf-8').rstrip('f'))

    return df


conf = SparkConf().setAppName("BinaryReader").setMaster("local")

sqlContext = SQLContext (sc)

rdd = sc.binaryFiles(inputfile).map(decodeRecord).collect()
print (type(rdd))
print (rdd)

df = sqlContext.createDataFrame(rdd, sparkSchema)
print ("Number of records in DataFrame: " + str(df.count()))

df.show()

Ниже выводится:

    Start Time: 2018-12-12 20:11:55.141848
<type 'list'>
[   Field1  Field2  Field3       Field4
0       1  01020d  e33352  14752658877
1       1  01020d  e33352  14752658877]
Number of records in DataFrame: 1
+------+------+------+------+
|Field1|Field2|Field3|Field4|
+------+------+------+------+
|  null|  null|  null|  null|
+------+------+------+------+

1 Ответ

0 голосов
/ 15 декабря 2018

Ваша функция decodeRecord() возвращает кадр данных pandas, поэтому полученный PipelinedRDD содержит одну строку, содержащую полный кадр данных pandas.Таким образом, вы должны взять эту первую строку и преобразовать ее в искровой фрейм данных.

Это часть измененного кода:

rdd = sc.binaryFiles(inputfile).map(decodeRecord)
panda_df = rdd.first()
print (type(rdd))
print (type(panda_df))

df = sqlContext.createDataFrame(panda_df)
print ("Number of records in DataFrame: " + str(df.count()))

df.show()

Вывод:

Start Time: 2018-12-15 17:43:21.241421
<class 'pyspark.rdd.PipelinedRDD'>
<class 'pandas.core.frame.DataFrame'>
Number of records in DataFrame: 4
+------+--------+------+----------------+
|Field1|  Field2|Field3|          Field4|
+------+--------+------+----------------+
|    48|31303130|323044|4646453333333532|
|    49|34373532|363538|3837374646464646|
|    48|31303130|323044|4646453333333532|
|    49|34373532|363538|3837374646464646|
+------+--------+------+----------------+

Естьдругие возможные улучшения в вашем коде, такие как использование rdd.flatMap () или использование функции decodeRecord () напрямую для получения DF-кода pandas и преобразования в spark DF без вызова rdd.map ().Просто несколько предложений.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...