Py4JJavaError: Произошла ошибка при вызове o57.showString.: org.apache.spark.SparkException: - PullRequest
0 голосов
/ 12 июня 2019

Я работаю с pyspark, подключенным к экземпляру AWS (r5d.xlarge 4 виртуальных ЦП 32 ГБ), работающему с базой данных 25 ГБ, при запуске некоторых таблиц я получаю сообщение об ошибке:

Py4JJavaError: Произошла ошибкаво время вызова o57.showString.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 0.0 не выполнено 1 раз, последний сбой: потерянное задание 0.0 на этапе 0.0 (TID 0, localhost, драйвер исполнителя): java.lang.OutOfMemoryError: Превышен лимит накладных расходов GC

Я попытался найти ошибку для себя, но, к сожалению, информации по этому вопросу немного.

code


from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').\
     config('spark.jars.packages', 'mysql:mysql-connector-java:5.1.44').\
     appName('test').getOrCreate()

df = spark.read.format('jdbc').\
        option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
        option('driver', 'com.mysql.jdbc.Driver').\
        option('user', 'xxxxxxxxxxx').\
        option('password', 'xxxxxxxxxxxxxxxxxxxx').\
        option('dbtable', 'dbname.tablename').\
        load()

  df.printSchema()

здесь я получаю printSchema, но затем:


df_1 = df.select(['col1', 'col2', 'col3', 'col4', 
                  'col4', 'col5', 'col6']).show()

Py4JJavaError: An error occurred while calling o57.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task            
  in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
  0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: GC 
  overhead limit exceeded

Кто-нибудь знает, как я могу решить эту проблему?

1 Ответ

0 голосов
/ 20 июня 2019

Вот метод для распараллеливания последовательных операций чтения JDBC по нескольким spark workers ... вы можете использовать это как руководство для настройки его под ваши исходные данные ... в основном, основным условием является наличие какого-то уникального ключа дляразделены на.

Пожалуйста, обратитесь к этой документации, в частности параметры partitionColumn, lowerBound, upperBound, numPartitions

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Некоторые примеры кода:

# find min and max for column used to split on
from pyspark.sql.functions import min, max

minDF = df.select(min("id")).first()[0] # replace 'id' with your key col
maxDF = df.select(max("id")).first()[0] # replace 'id' with your key col
numSplits = 125 # you will need to tailor this value to your dataset ... you mentioned your source as 25GB so try 25000 MB / 200 MB = 125 partitions

print("df min: {}\df max: {}".format(minDF, maxDF))

# your code => add a few more parameters
df = spark.read.format('jdbc').\
        option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
        option('driver', 'com.mysql.jdbc.Driver').\
        option('user', 'xxxxxxxxxxx').\
        option('password', 'xxxxxxxxxxxxxxxxxxxx').\
        option('dbtable', 'dbname.tablename').\
        option('partitionColumn', 'id').\ # col to split on
        option('lowerBound', minDF).\ # min value
        option('upperBound', maxDF).\ # max value
        option('numPartitions', numSplits).\ # num of splits (partitions) spark will distribute across executor workers
        load()

print(df.rdd.getNumPartitions())

Другой пример подключенияstring => если вы используете spark 2.4 / обратитесь к этому документу, он использует более чистый код

https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#manage-parallelism

sourceDF = spark.read.jdbc(
  url=jdbcUrl, 
  table="dbname.tablename",
  column='"id"',
  lowerBound=minDF,
  upperBound=maxDF,
  numPartitions=125,
  properties=connectionProps
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...