pyspark 2.4.x структурированная потоковая передача foreachBatch не работает - PullRequest
0 голосов
/ 15 июня 2019

Я работаю с spark 2.4.0 и python 3.6.Я занимаюсь разработкой программы на python со структурированными потоковыми действиями pyspark.Программа запускает два чтения потока чтения из двух сокетов, и после этого производится объединение этих двух потоковых данных.Я пробовал спарк 2.4.0 и 2.4.3, но ничего не изменилось.Затем я выполняю уникальный записывающий поток, чтобы записать только один выходной поток данных.Это работает хорошо.Однако, поскольку мне нужно написать также набор данных без потоковой передачи для всех микропартий, я закодировал вызов foreachBatch внутри потока записи.ЭТО НЕ РАБОТАЕТ.

Я помещаю spark.scheduler.mode = FAIR в spark.defaults.conf.Я бегу через spark-submit, но хотя я пробовал напрямую с python3, он не работает вообще.Похоже, что он не выполнил функцию splitStream, указанную в foreachBatch.Я попытался добавить некоторые функции печати в функцию splitStream без каких-либо эффектов.

Я сделал много попыток, но ничего не изменилось, я отправил через spark-submit и python.Я работаю над автономным кластером с искрой.

inDF_1 = spark \
    .readStream \
    .format('socket') \
    .option('host', host_1) \
    .option('port', port_1) \
    .option("maxFilesPerTrigger", 1) \
    .load()

inDF_2 = spark \
    .readStream \
    .format('socket') \
    .option('host', host_2) \
    .option('port', port_2) \
    .option("maxFilesPerTrigger", 1) \
    .load() \
    .coalesce(1)

inDF = inDF_1.union(inDF_2)

#--------------------------------------------------#
#  write streaming raw dataser R-01 plateMeasures  #
#--------------------------------------------------#

def splitStream(df, epoch_id):
    df \
        .write \
        .format('text') \
        .outputMode('append') \
        .start(path = outDir0)

    listDF = df.collect()
    print(listDF)
    pass

stageDir = dLocation.getLocationDir('R-00')
outDir0 = dLocation.getLocationDir(outList[0])
chkDir = dLocation.getLocationDir('CK-00')
query0 = programName + '_q0'
q0 = inDF_1 \
        .writeStream \
        .foreachBatch(splitStream) \
        .format('text') \
        .outputMode('append') \
        .queryName(query0) \
        .start(path = stageDir
                    , checkpointLocation = chkDir)

Я использую foreachBatch, потому что мне нужно написать несколько шинков для каждой входной микробатчи.СПАСИБО большое, чтобы каждый мог попытаться мне помочь -

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...