Ошибка записи pyspark в CSV-файл с использованием udf - PullRequest
0 голосов
/ 20 июня 2019

Попытка экспортировать кадр данных искры в CSV с помощью этой функции:

def save_to_csv(df, filepath, append_header=False):
    columns = [c for c in df.columns]
    len_columns = len(df.columns)
    # Create single column of comma-separated fields (no brackets - data only), replacing null values with ''
    rdd_data = df.rdd.map(lambda row: ','.join([str(row[i]) if row[i] != None else '' for i in range(len_columns)]))
    rdd_all = None
    if append_header:
        # Create header column of comma-separated field names
        header = ','.join([c for c in columns])
        rdd_header = sc.parallelize([header])
        rdd_header = rdd_header.zipWithIndex()
        rdd_data = rdd_data.zipWithIndex().map(lambda row: (row[0], 1+row[1]))
        rdd_all = rdd_header.union(rdd_data)
        rdd_all = rdd_all.sortBy(lambda row : row[1])
        rdd_all = rdd_all.map(lambda row : row[0])
    else:
        rdd_all = rdd_data
    rdd_all.saveAsTextFile(filepath, 'org.apache.hadoop.io.compress.GzipCodec')

Раньше можно было использовать его без проблем, но в настоящее время он выходит из строя из-за «ошибки Юникода». Я считаю, что мне нужно преобразовать кодировку в 'utf-8', но я не уверен, где в функции ее применить

...