Что я должен добавить в код, чтобы избежать ошибки «превышает максимально допустимые байты» при использовании pyspark? - PullRequest
0 голосов
/ 20 февраля 2019

У меня есть датафрейм с 4 миллионами строк и 10 столбцами.Я пытаюсь записать это в таблицу в формате hdf из Cloudera Data Science Workbench, используя pyspark.Я сталкиваюсь с ошибкой при попытке сделать это:

[Stage 0:>                                                          (0 + 1) / 
2]19/02/20 12:31:04 ERROR datasources.FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 318690577 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

Я могу разбить фрейм данных на 3 фрейма и выполнить спарк-запись 3 отдельных раза, но я хотел бы сделать это только один раз, если это возможновозможно добавив что-то к искровому коду, например coalesce.

import pandas as pd
df=pd.read_csv('BulkWhois/2019-02-20_Arin_Bulk/Networks_arin_db_2-20-2019_parsed.csv')

'''PYSPARK'''
from pyspark.sql import SQLContext
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark import SparkContext
spark = SparkSession.builder.appName('Arin_Network').getOrCreate()

schema = StructType([StructField('NetHandle', StringType(), False),
                     StructField('OrgID', StringType(), True),
                     StructField('Parent', StringType(), True),
                     StructField('NetName', StringType(), True),
                     StructField('NetRange', StringType(), True),
                     StructField('NetType', StringType(), True),
                     StructField('Comment', StringType(), True),
                     StructField('RegDate', StringType(), True),
                     StructField('Updated', StringType(), True),
                     StructField('Source', StringType(), True)])

dataframe = spark.createDataFrame(df, schema)
dataframe.write. \
  mode("append"). \
  option("path", "/user/hive/warehouse/bulkwhois_analytics.db/arin_network"). \
  saveAsTable("bulkwhois_analytics.arin_network")

1 Ответ

0 голосов
/ 25 февраля 2019

Пользователь10465355 упомянул, что я должен использовать Spark напрямую.Делать это проще и правильнее.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Networks').getOrCreate()

dataset = spark.read.csv('Networks_arin_db_2-20-2019_parsed.csv', header=True, inferSchema=True)
dataset.show(5)

dataset.write \
  .mode("append") \
  .option("path", "/user/hive/warehouse/analytics.db/arin_network") \
  .saveAsTable("analytics.arin_network")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...