Для каждого приемника со следующим определением
class CouchBaseSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
override def open(partitionId: Long, version: Long): Boolean = {
// Open connection
true
}
override def process(value: Row): Unit =
{
convertRowToJSON(value)
println("My Test Hi")
}
override def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
используется в следующем потоковом запросе
val streamDf= records
.filter($"region"==="US" )
.withColumn("startTimeEpoch", ud($"startTimeEpoch"))
.withColumn("startTimeEpoch", $"startTimeEpoch".cast(sql.types.TimestampType))
.withWatermark("startTimeEpoch", "10 minutes")
.groupBy(window($"startTimeEpoch","10 minutes","10 minutes"),$"accountId")
.count()
streamDf.writeStream
.foreach(new CouchBaseSinkForeach)
.start()
.awaitTermination()
В режиме добавления Spark не вызывает метод обработки. Но это вызывает открытый метод. Все работает нормально, когда используется режим обновления.
Эта ошибка на стороне искры.? Я использую потоковую искру 2.4.5