Вызов функции с каждым элементом в потоке в Databricks - PullRequest
1 голос
/ 25 марта 2019

У меня есть поток DataFrame в Databricks, и я хочу выполнить действие для каждого элемента. В сети я нашел методы специального назначения, такие как запись в консоль или сброс в память, но я хочу добавить некоторую бизнес-логику и поместить некоторые результаты в Redis.

Если быть более точным, то это будет выглядеть в не потоковом случае:

val someDataFrame = Seq(
  ("key1", "value1"),
  ("key2", "value2"),
  ("key3", "value3"),
  ("key4", "value4")
).toDF()

def someFunction(keyValuePair: (String, String)) = {
  println(keyValuePair)
}

someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))

Но если someDataFrame - это не простой фрейм данных, а фрейм потоковых данных (действительно исходящий от Kafka), сообщение об ошибке будет таким:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Может ли кто-нибудь помочь мне решить эту проблему?

Некоторые важные замечания:

  • Я прочитал соответствующую документацию, такую ​​как Spark Streaming или Databricks Streaming, а также несколько других описаний.

  • Я знаю, что должно быть что-то вроде start() и awaitTermination, но я не знаю точного синтаксиса. Описания не помогли.

  • Потребуются страницы, чтобы перечислить все возможности, которые я пробовал, поэтому я не буду их предоставлять.

  • Я не хочу решить конкретную проблему отображения результата. То есть пожалуйста, не предоставляйте решение этого конкретного случая. someFunction будет выглядеть так:

val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
  doSomething(keyValuePair);
}

(Вопрос Какова цель ForeachWriter в Spark Structured Streaming? не дает рабочего примера, поэтому не отвечает на мой вопрос.)

1 Ответ

1 голос
/ 26 марта 2019

Вот пример чтения с использованием foreachBatch для сохранения каждого элемента в redis с использованием потокового API.

Относится к предыдущему вопросу ( преобразование DataFrame в RDD [(String, String)] )

// import spark and spark-redis
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.streaming._
import org.apache.spark.sql.types._

import com.redislabs.provider.redis._

// schema of csv files
val userSchema = new StructType()
    .add("name", "string")
    .add("age", "string")

// create a data stream reader from a dir with csv files
val csvDF = spark
  .readStream
  .format("csv")
  .option("sep", ";")
  .schema(userSchema)
  .load("./data") // directory where the CSV files are 

// redis
val redisConfig = new RedisConfig(new RedisEndpoint("localhost", 6379))
implicit val readWriteConfig: ReadWriteConfig = ReadWriteConfig.Default

csvDF.map(r => (r.getString(0), r.getString(0))) // converts the dataset to a Dataset[(String, String)]
  .writeStream // create a data stream writer
  .foreachBatch((df, _) => sc.toRedisKV(df.rdd)(redisConfig)) // save each batch to redis after converting it to a RDD
  .start // start processing
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...