Я пытаюсь прочитать данные postgres в моем искровом фрейме данных.Все работает нормально, пока я не загружаю столбец, структура данных которого представляет собой массив json.если я получаю данные за 1 месяц, он истекает, если я получаю данные за 1 день, вместо 5 минут, которые раньше занимали, это занимает примерно 30 минут.Более того, структура данных массива json является строковой, поэтому я не могу применить к ней UDF.
Как загрузить данные за 1 день с меньшим временем?
Какя должен убедиться, что формат - это массив json и string?
conf = pyspark.SparkConf().setAll([("spark.driver.extraClassPath", "/usr/local/bin/postgresql-42.2.5.jar:/usr/local/jar/gcs-connector-hadoop2-latest.jar")
,("spark.executor.instances", "4")
,("spark.executor.cores", "4")
,("spark.executor.memories", "10g")
,("spark.driver.memory", "10g")
,("spark.dirver.maxResultSize", "10g")
,("spark.memory.offHeap.enabled","true")
,("spark.memory.offHeap.size","40g")])
sc = pyspark.SparkContext(conf=conf)
sc.getConf().getAll()
sqlContext = SQLContext(sc)
query = """
select
users.userid
,users.createdat as users_createdat
,jsoncolumn
from users
left join actions
on users.userid = actions.userid
where users.createdat between '{start_date}' and '{end_date}'
and geo->>'country'='US'
""".format(start_date=start_date, end_date = end_date)
url = 'postgresql://url'
df_join = sqlContext.read.format("jdbc")\
.option("url",'jdbc:%s' % url)\
.option("user", 'username')\
.option("password", 'pass')\
.option("query",query)\
.load()
start_time = time.time()
df_join.cache()
df_join.collect()
end_time = time.time()
print((end_time - start_time))