Понимание foreach с помощью pyspark структурированной потоковой передачи - PullRequest
0 голосов
/ 22 февраля 2019

Я пытаюсь выяснить, как применить foreach к примеру подсчета слов в pyspark, потому что в моем случае использования я должен иметь возможность писать в несколько источников.Однако класс foreach, кажется, фактически никогда не выполняется, и файл никогда не создается.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
import os
import uuid
import tempfile


spark = SparkSession.builder.appName('Struct-stream').getOrCreate()

lines = spark \
        .readStream \
        .format('socket') \
        .option('host', 'localhost') \
        .option('port', 9999) \
        .load()

words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

wordCounts = words.groupBy("word").count()

open_dir = tempfile.mkdtemp()
process_dir = tempfile.mkdtemp()

class Writer:

    open_dir = open_dir
    process_dir = process_dir

    def open(self, partition_id, epoch_id):
        with open(os.path.join(self.open_dir, str(uuid.uuid4())), 'w') as f:
            f.write("%s\n" % str({'partition_id': partition_id, 'epoch': epoch_id}))
        return True

    def process(self, row):
        with open(os.path.join(self.process_dir, str(uuid.uuid4())), 'w') as f:
            f.write("%s\n" % str({'value': 'text'}))


query = wordCounts \
        .writeStream \
        .foreach(Writer()) \
        .outputMode('complete') \
        .format('console') \
        .start()


query.awaitTermination()

Я пытаюсь понять, почему ни один файл не записывается, или если класс Writer когда-либона самом деле исполняется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...