Использование API структурированной потоковой передачи Spark в DStream - PullRequest
0 голосов
/ 13 июля 2020
• 1000

Что создает DStream.

Чтобы работать со временем события (а не временем обработки), я сделал следующее:

outputStream
      .foreachRDD(rdd => {
          rdd.toDF().withWatermark("timestamp", "60 seconds")
            .groupBy(
              window($"timestamp", "60 seconds", "10 seconds")
            )
            .sum("meterIncrement")
            .toJSON
            .toDF("value")
            .writeStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("topic", "taxi-dollar-accurate")
            .start()
        )
      })

И я получаю сообщение об ошибке

'writeStream' можно вызывать только в потоковом Dataset / DataFrame

Что меня удивило, потому что источником DF является DStream. В любом случае, мне удалось решить эту проблему, заменив .writeStream на .write и .start () на .save ().

Но у меня возникло ощущение, что я каким-то образом потерял потоковую мощность на этом foreach. Понятно, поэтому я пишу этот вопрос. Это правильный подход? Я видел другие сценарии, в которых используется

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

. Но я не знаю, насколько это отличается от простого вызова foreach в DStream и последующего преобразования каждого RDD в DF.

1 Ответ

1 голос
/ 13 июля 2020

Но я не знаю, чем это отличается от простого вызова foreach в DStream и последующего преобразования каждого RDD в DF.

Когда вы звоните:

outputStream
      .foreachRDD(rdd => {
          rdd.toDF()
            .[...]
            .toJSON
            .toDF("value")
            .writeStream
            .format("kafka")

ваша переменная rdd (или Dataframe) стала single RDD, которая больше не является потоком. Если вы решите использовать подход DSream, вам необходимо отправить эти single RDD, вызывающие KafkaProducer API.

Пример:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val producer = new KafkaProducer[String, String](kafkaParameters)
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

Однако, это не рекомендуемый подход, так как вы создаете и закрываете KafkaProducer в каждом интервале пакета для каждого исполнителя. Но это должно дать вам базовое c понимание того, как записывать данные в Kafka с помощью DirectStream API.

Для дальнейшей оптимизации отправки ваших данных в Kafka вы можете следовать инструкциям, приведенным здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...