Поток Kafka to Spark с использованием ошибки Pyspark - PullRequest
0 голосов
/ 12 июня 2019

Я пытаюсь получить данные в спарк по теме кафки, но я не могу этого сделать.Я пробовал учебники, чья ссылка является общей, но, наконец, я получаю сообщение об ошибке.Я также добавил все необходимые файлы jar (расположение: - usr / local / spark / jars).Пожалуйста, дайте мне знать, что может быть не так.Также я хотел бы знать, как это можно сделать с помощью программирования Scala.

https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#deploying-applications

https://medium.com/@kass09/spark-streaming-kafka-in-python-a-test-on-local-machine-edd47814746 enter image description here

Попробовав эту команду потокового воспроизведения, я получил ошибку.

"bin / spark-submit --packages org.apache.spark: spark-streaming-kafka-0-8_2.11: 2.1.1 examples /src / main / python / streaming / direct_kafka_wordcount.py "

Я получил ошибку jupyter, поэтому я попытался выполнить следующую команду, чтобы решить ее, но ошибка все равно остается" pip3 install --upgrade --force-reinstall --no-cache-dir jupyter "

1 Ответ

0 голосов
/ 18 июня 2019

Интеграция Spark и Kafka в Scala

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent


object sparkStreaming_Kafka {

  @transient lazy val log = org.apache.log4j.LogManager.getLogger("sparkStreaming_Kafka")

  def main(args: Array[String]): Unit = {

    log.debug("added the messages ****** ---->>>")

    val spark = SparkSession
      .builder()
      .appName("my_App" )
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(2)) // the polling frequency is 2 seconds, can be modified based on the BM requirements.
    log.debug("Before starting the Stream -->>")
    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String]
      (Array.apply("my_topic"), getKafkaParams())).map(record => record.value)

    stream.foreachRDD { rdd =>
      try {
        if (!rdd.isEmpty()) {
          rdd.foreach(x => postData(x))
        }
      } catch {
        case t: Throwable =>
          t.printStackTrace() // TODO: handle error
          log.error("Exception occured while processing the data exception is {}", t.getCause)
      }
    }

    ssc.start()
    log.debug("started now-->> " + compat.Platform.currentTime)
    ssc.awaitTermination()
  }

  def getKafkaParams(): Map[String, Object] = {
    Map[String, Object]("bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group_id",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)) 
  }


  def postData(event: String): Unit = {
    log.info("before KinesisSink.process call ==>>"+event)

    print(event)  // use the event as per the requirement

  }
}
...