Я начал использовать AWS сервисы для ETL. У меня есть определенная проблема со стекированием памяти и ресурсов для моих работ с клеем, партнер сказал мне изменить Spark Dataframe на Dynami c Frame (читать его как Dynami c Frame), но я не уверен, что я сможет поддерживать скрипт (в основном SQL Context) или мне нужно будет изменить запрос SQL Context на преобразование данных Scala / Pyspark.
EDIT:
My GLUE Код:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext,SparkConf
from pyspark.sql.window import Window
from functools import reduce
from operator import add
import re
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
sqlContext = SQLContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
spark._jsc.hadoopConfiguration().set("parquet.summary.metadata.level", "NONE")
df1_path = 's3_path'
df1= sqlContext.read.load(df1_path)
sqlContext.registerDataFrameAsTable(df1, "table1")
df2= sqlContext.sql("Query_to_table1")
sqlContext.registerDataFrameAsTable(df2, "table2")
df3 = sqlContext.sql("Query_to_table2")
df3_comp = df3.repartition(1)
output_s3_path = "out_s3_path"
df3_comp_DDF = DynamicFrame.fromDF(df3_comp,glueContext,'output')
datasink = glueContext.write_dynamic_frame.from_options(frame = df3_comp_DDF , connection_type = "s3", connection_options = {"path": output_s3_path }, format = "parquet", transformation_ctx = "datasink")
Ошибка моего КЛЕЯ:
servlet.ServletHandler (ServletHandler.java:doHandle(632)) - /api/v1/applications/application_1579274094469_0001
java.lang.NullPointerException
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:171)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:539)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)