как написать .stream в режиме добавления - PullRequest
2 голосов
/ 04 марта 2020

Получение режима вывода ошибок не поддерживается при потоковых агрегатах потоковых DataFrames / DataSets без водяного знака. Я хотел получить вывод на консоль.

class StructSpark:
  def __init__(self, address, port):
    self.address = address
    self.port = port
    self.spark = SparkSession.builder.appName("StructuredWordcount").getOrCreate()
def getonline(self):
    lines = self.spark.readStream.format('socket').option('host', self.address).option('port', self.port).option(
        'includeTimestamp', 'true').load()
    words = lines.select(split(lines.value, ',').alias("value"), lines.timestamp)
    words1 = words.select((split(words.value[0], ',')).alias("key"),(split(words.value[0], ',')).alias("value"), lines.timestamp)
    windowedCount = words1.withWatermark("timestamp", "10 minutes").groupBy(window(words1.timestamp, "5 minutes", "5 minutes"),words1.key).count()
    windowedCount.createOrReplaceTempView("updates")
    count = self.spark.sql("select * from updates where count > 1")
    with open('/home/vaibhav/Desktop/data.txt', 'a') as file:
        file.write(str(count))
    query = count.writeStream.outputMode("Append").format("console").start()
    query.awaitTermination()

1 Ответ

1 голос
/ 04 марта 2020

Поскольку вы выполняете операцию агрегирования в своем потоке, вы не можете выполнить write.stream в режиме добавления. Либо используйте его в режиме «Complete», либо выполните write.stream перед операцией агрегирования.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...