Задание Spark застревает на последнем этапе импорта из базы данных Oracle - данные не перекошены - PullRequest
0 голосов
/ 10 сентября 2018

Я пытаюсь получить данные из базы данных Oracle и поместить их в AWS S3 , используя Apache Spark 2.3.1 . Работа идет хорошо до последнего этапа и застревает там. Я не думаю, что данные искажены, потому что каждый этап имеет одинаковое количество записей. Ниже приведен запрос, который я использую в spark.

url = "jdbc:oracle:thin:@IP:PORT/SID"
user = "user"
password = "password"
driver = "oracle.jdbc.driver.OracleDriver"
table = "table"
fetchSize = 1000
partitionColumn = "num_rows"

date1 = (datetime.today() - td(days=42)).date().strftime('%d-%b-%Y')
date2 = (datetime.today() - td(days=2)).date().strftime('%d-%b-%Y')

query = "(select min(rownum) as min, max(rownum) as max from "+table+" where date>='"+str(date1)+"' and date<='"+str(date2)+"') tmp1"
print(query)

DF = spark.read.format("jdbc").option("url", url) \
                              .option("dbtable", query) \
                              .option("user", user) \
                              .option("password", password) \
                              .option("driver", driver) \
                              .load()

lower_bound, upper_bound = DF.first()
lower_bound = int(lower_bound)
upper_bound = int(upper_bound)
numPartitions = int(upper_bound/fetchSize)+1
print(lower_bound,upper_bound)
print(numPartitions)

query = "(select t1.*, ROWNUM as num_rows from (select * from " + table + " where date>='"+str(date1)+"' and date<='"+str(date2)+"') t1) tmp2"
print(query)

DF = spark.read.format("jdbc").option("url", url) \
                              .option("dbtable", query) \
                              .option("user", user) \
                              .option("password", password) \
                              .option("fetchSize",fetchSize) \
                              .option("numPartitions", numPartitions) \
                              .option("partitionColumn", partitionColumn) \
                              .option("lowerBound", lower_bound) \
                              .option("upperBound", upper_bound) \
                              .option("driver", driver) \
                              .load()

path = "s3://my_path"
DF.write.mode("overwrite").parquet(path)

Код в основном извлекает данные за последние 42 дня и помещает их в корзину S3. Ниже приведен вывод до оператора записи. Код был выполнен на '10 -Sep-2018 '

(select min(rownum) as min, max(rownum) as max from table where date>='30-Jul-2018' and date<='08-Sep-2018') tmp1
(1, 2195427)
2196
(select t1.*, ROWNUM as num_rows from (select * from table where date>='30-Jul-2018' and date<='08-Sep-2018') t1) tmp2

Как видите,

  • общее количество записей = 2195427
  • записей на раздел = 1000
  • количество разделов = 2196

Таким образом, задание имеет 2196 этапов, и каждый этап извлекает 1000 записей. Работа застряла на 2191/2196 , и впереди еще 5 этапов.

Характеристики оборудования:

Я использую машины r4.xlarge. Мой кластер - 1 Мастер, 2 Раба из r4.xlarge. Ниже приведены спецификации моего водителя и исполнителя.

spark.driver.cores  8
spark.driver.memory 24g
spark.driver.memoryOverhead 3072M
spark.executor.cores    1
spark.executor.memory   3g
spark.executor.memoryOverhead   512M
spark.yarn.am.cores 1
spark.yarn.am.memory    3g
spark.yarn.am.memoryOverhead    512M

Интерфейс Spark Executor

Этапы с 1 по 2191 были выполнены за 1,3 часа, но оставшиеся 5 этапов застряли более чем на три часа.

Пожалуйста, найдите журнал здесь: https://github.com/rinazbelhaj/stackoverflow/blob/master/Spark_Log_10_Sept_2018

Я не могу выяснить причину этой проблемы.

1 Ответ

0 голосов
/ 24 января 2019

Полагаю, у вас есть два возможных сценария:

1 - проблема, связанная с БД Oracle

Я имею дело с очень похожей проблемой. Но вместо того, чтобы застрять, задача была прервана SQL Server. Прерывание было вызвано connection reset и происходит случайно.

Чтобы избежать этого, я установил некоторые параметры в строке подключения JDBC. Ошибка была остановлена, но задача никогда не заканчивается.

  • оригинал:

jdbc:sqlserver://host:port;database=db_name;

  • изменение:
db_url=jdbc:sqlserver://host:port;
database=db_name;
applicationIntent=readonly;
applicationName=app-name;
columnEncryptionSetting=Disabled;
disableStatementPooling=true;
encrypt=false;
integratedSecurity=false;
lastUpdateCount=true;
lockTimeout=-1;
loginTimeout=15;
multiSubnetFailover=false;
packetSize=8000;
queryTimeout=-1;
responseBuffering=adaptive;
selectMethod=direct;
sendStringParametersAsUnicode=true;
serverNameAsACE=false;
TransparentNetworkIPResolution=true;
trustServerCertificate=false;
trustStoreType=JKS;
sendTimeAsDatetime=true;
xopenStates=false;
authenticationScheme=nativeAuthentication;
authentication=NotSpecified;
socketTimeout=0;
fips=false;
enablePrepareOnFirstPreparedStatementCall=false;
serverPreparedStatementDiscardThreshold=10;
statementPoolingCacheSize=0;
jaasConfigurationName=SQLJDBCDriver;
sslProtocol=TLS;
cancelQueryTimeout=-1;
useBulkCopyForBatchInsert=false;

Таким образом, я решил удалить добавленные параметры в строке соединения JDBC и начал передавать настройку спарк при создании кластера. Я изменил максимальное количество повторов с 4 (по умолчанию) на 50.

  • spark.task.maxFailures=50

Итак, проблема с подключением сохраняется, но по крайней мере задача успешно завершается.

Я бы посоветовал вам установить любое время ожидания соединения, потому что, возможно, оно неограниченно - обычно устанавливается как 0 или -1. Просмотрите документацию по драйверу Oracle и попробуйте изменить поведение по умолчанию.

2 - проблема, связанная с S3

Мы также столкнулись с проблемой, связанной с операцией write на S3. Я не смог найти точное сообщение об ошибке, но оно было похоже на An error occurred while calling o70.parquet.

Когда мы решили упомянутую проблему, скорость записи заняла слишком много времени.

Один парень из нашей команды предложил использовать HDFS для записи данных из базы данных, а затем скопировать операцию из HDFS в S3. Производительность существенно возросла.

  • установить HDFS в качестве места назначения (возможно, потребуется увеличить размер диска от главного узла)
destination = 'hdfs:///path-to-hdfs'

DF.write                \
  .mode("overwrite")    \
  .parquet(destination)
  • выполнить копирование с HDFS на S3

Ссылка: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

Надеюсь, это поможет вам! Я сообщу о любых улучшениях от моей задачи;)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...