Я работаю с spark 2.4.0 и python 3.6.Я занимаюсь разработкой программы на python со структурированными потоковыми действиями pyspark.Программа запускает два чтения потока чтения из двух сокетов, и после этого производится объединение этих двух потоковых данных.Я пробовал спарк 2.4.0 и 2.4.3, но ничего не изменилось.Затем я выполняю уникальный записывающий поток, чтобы записать только один выходной поток данных.Это работает хорошо.Однако, поскольку мне нужно написать также набор данных без потоковой передачи для всех микропартий, я закодировал вызов foreachBatch внутри потока записи.ЭТО НЕ РАБОТАЕТ.
Я помещаю spark.scheduler.mode = FAIR в spark.defaults.conf.Я бегу через spark-submit, но хотя я пробовал напрямую с python3, он не работает вообще.Похоже, что он не выполнил функцию splitStream, указанную в foreachBatch.Я попытался добавить некоторые функции печати в функцию splitStream без каких-либо эффектов.
Я сделал много попыток, но ничего не изменилось, я отправил через spark-submit и python.Я работаю над автономным кластером с искрой.
inDF_1 = spark \
.readStream \
.format('socket') \
.option('host', host_1) \
.option('port', port_1) \
.option("maxFilesPerTrigger", 1) \
.load()
inDF_2 = spark \
.readStream \
.format('socket') \
.option('host', host_2) \
.option('port', port_2) \
.option("maxFilesPerTrigger", 1) \
.load() \
.coalesce(1)
inDF = inDF_1.union(inDF_2)
#--------------------------------------------------#
# write streaming raw dataser R-01 plateMeasures #
#--------------------------------------------------#
def splitStream(df, epoch_id):
df \
.write \
.format('text') \
.outputMode('append') \
.start(path = outDir0)
listDF = df.collect()
print(listDF)
pass
stageDir = dLocation.getLocationDir('R-00')
outDir0 = dLocation.getLocationDir(outList[0])
chkDir = dLocation.getLocationDir('CK-00')
query0 = programName + '_q0'
q0 = inDF_1 \
.writeStream \
.foreachBatch(splitStream) \
.format('text') \
.outputMode('append') \
.queryName(query0) \
.start(path = stageDir
, checkpointLocation = chkDir)
Я использую foreachBatch, потому что мне нужно написать несколько шинков для каждой входной микробатчи.СПАСИБО большое, чтобы каждый мог попытаться мне помочь -