Я пытаюсь получить данные из базы данных Oracle и поместить их в AWS S3 , используя Apache Spark 2.3.1 . Работа идет хорошо до последнего этапа и застревает там. Я не думаю, что данные искажены, потому что каждый этап имеет одинаковое количество записей. Ниже приведен запрос, который я использую в spark.
url = "jdbc:oracle:thin:@IP:PORT/SID"
user = "user"
password = "password"
driver = "oracle.jdbc.driver.OracleDriver"
table = "table"
fetchSize = 1000
partitionColumn = "num_rows"
date1 = (datetime.today() - td(days=42)).date().strftime('%d-%b-%Y')
date2 = (datetime.today() - td(days=2)).date().strftime('%d-%b-%Y')
query = "(select min(rownum) as min, max(rownum) as max from "+table+" where date>='"+str(date1)+"' and date<='"+str(date2)+"') tmp1"
print(query)
DF = spark.read.format("jdbc").option("url", url) \
.option("dbtable", query) \
.option("user", user) \
.option("password", password) \
.option("driver", driver) \
.load()
lower_bound, upper_bound = DF.first()
lower_bound = int(lower_bound)
upper_bound = int(upper_bound)
numPartitions = int(upper_bound/fetchSize)+1
print(lower_bound,upper_bound)
print(numPartitions)
query = "(select t1.*, ROWNUM as num_rows from (select * from " + table + " where date>='"+str(date1)+"' and date<='"+str(date2)+"') t1) tmp2"
print(query)
DF = spark.read.format("jdbc").option("url", url) \
.option("dbtable", query) \
.option("user", user) \
.option("password", password) \
.option("fetchSize",fetchSize) \
.option("numPartitions", numPartitions) \
.option("partitionColumn", partitionColumn) \
.option("lowerBound", lower_bound) \
.option("upperBound", upper_bound) \
.option("driver", driver) \
.load()
path = "s3://my_path"
DF.write.mode("overwrite").parquet(path)
Код в основном извлекает данные за последние 42 дня и помещает их в корзину S3. Ниже приведен вывод до оператора записи. Код был выполнен на '10 -Sep-2018 '
(select min(rownum) as min, max(rownum) as max from table where date>='30-Jul-2018' and date<='08-Sep-2018') tmp1
(1, 2195427)
2196
(select t1.*, ROWNUM as num_rows from (select * from table where date>='30-Jul-2018' and date<='08-Sep-2018') t1) tmp2
Как видите,
- общее количество записей = 2195427
- записей на раздел = 1000
- количество разделов = 2196
Таким образом, задание имеет 2196 этапов, и каждый этап извлекает 1000 записей. Работа застряла на 2191/2196 , и впереди еще 5 этапов.
Характеристики оборудования:
Я использую машины r4.xlarge. Мой кластер - 1 Мастер, 2 Раба из r4.xlarge. Ниже приведены спецификации моего водителя и исполнителя.
spark.driver.cores 8
spark.driver.memory 24g
spark.driver.memoryOverhead 3072M
spark.executor.cores 1
spark.executor.memory 3g
spark.executor.memoryOverhead 512M
spark.yarn.am.cores 1
spark.yarn.am.memory 3g
spark.yarn.am.memoryOverhead 512M
Интерфейс Spark Executor
Этапы с 1 по 2191 были выполнены за 1,3 часа, но оставшиеся 5 этапов застряли более чем на три часа.
Пожалуйста, найдите журнал здесь:
https://github.com/rinazbelhaj/stackoverflow/blob/master/Spark_Log_10_Sept_2018
Я не могу выяснить причину этой проблемы.