таблица чтения / записи в executors / pyspark - PullRequest
0 голосов
/ 07 ноября 2019

Как читать / записывать из / в дельта-таблицу из pandas_udf в исполнителях?

По какой-то причине я хочу извлечь различные данные из набора данных. Изнутри pandas_udf я хочу прочитать для таблицы. Вопрос в том, как я могу выполнить эту задачу более эффективно, вместо того, чтобы каждый раз загружать таблицу? Обратите внимание, что у меня не может быть ключа в таблице для извлечения данных в группе. Напротив, я определил некоторые наборы в pydf. Dataframe, dummy и id1 могут существовать в нескольких наборах.

df=pd.DataFrame([[1,3,1],[1,1,1],[1,2,2],[2,1,3],[2,2,6]],columns= 
['id1','id2','value'])

pydf = sqlContext.createDataFrame(df)

df1 =  spark.read.format('delta').load('/mnt/datalake/dumpufile')

schema=StructType([StructField("id1", IntegerType()),StructField("id2", 
IntegerType()),StructField("val", IntegerType()),StructField("count", 
IntegerType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def func(dft):
from pyspark.sql import SparkSession
id1 = dft['id1'].values[0]
id2= dft['id2'].values[0]
val = dft['value'].values[0]
spark = SparkSession.builder.getOrCreate()
tmp = spark.read.format('delta').load('/dbfs/mnt/datalake/dumpufile').filter((F.col('id2')==id2)&(F.col('id1')==id1)).select('time','value').toPandas()

res = pd.DataFrame([],columns=['id1','id2','val','count'])
res['id1'] =[id1]
res['id2'] = [id2]
res['val'] = [val]
res['count'] = tmp.values[0,0]

return res

display(pydf.groupby('ban','panel').apply(func))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...