Режим добавления вывода не работает в структурированной потоковой передаче Spark с приемником foreach - PullRequest
0 голосов
/ 19 апреля 2020

Для каждого приемника со следующим определением

class CouchBaseSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {

override def open(partitionId: Long, version: Long): Boolean = {
 // Open connection

true
}

override def process(value: Row): Unit =
{
   convertRowToJSON(value)
   println("My Test Hi")
}


override def close(errorOrNull: Throwable): Unit = {
   // Close the connection
}

используется в следующем потоковом запросе

val streamDf=  records
  .filter($"region"==="US" )
  .withColumn("startTimeEpoch", ud($"startTimeEpoch"))
  .withColumn("startTimeEpoch", $"startTimeEpoch".cast(sql.types.TimestampType))
  .withWatermark("startTimeEpoch", "10 minutes")
  .groupBy(window($"startTimeEpoch","10 minutes","10 minutes"),$"accountId")
     .count()
streamDf.writeStream
  .foreach(new CouchBaseSinkForeach)
  .start()
  .awaitTermination()

В режиме добавления Spark не вызывает метод обработки. Но это вызывает открытый метод. Все работает нормально, когда используется режим обновления.

Эта ошибка на стороне искры.? Я использую потоковую искру 2.4.5

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...