java.lang.IllegalArgumentException: «путь» не указан // Spark Consumer Issue - PullRequest
0 голосов
/ 08 января 2019

Я пытаюсь создать SparkConsumer, чтобы в этом случае я мог отправлять сообщения в формате csv на Kafka через Spark Streaming. Но у меня есть ошибка, что «путь» не указан. Смотрите мой код ниже

Мой код выглядит следующим образом:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.OutputMode

object sparkConsumer extends App {

  val conf = new SparkConf().setMaster("local").setAppName("Name")
  val sc = new SparkContext(conf)

  val rootLogger = Logger.getRootLogger()
  rootLogger.setLevel(Level.ERROR)

  val spark = SparkSession
    .builder()
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()

  val schema = StructType(Array(
    StructField("InvoiceNo", StringType, nullable = true),
    StructField("StockCode", StringType, nullable = true),
    StructField("Description", StringType, nullable = true),
    StructField("Quantity", StringType, nullable = true)
  ))

  val streamingDataFrame = spark.readStream.schema(schema).csv("C:/Users/me/Desktop/Tasks/Tasks1/test.csv")

  streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
    writeStream
    .format("csv")
    .option("topic", "topic_test")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
    .start()

  import spark.implicits._
  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topic_test")
    .load()

  val df1 = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
    .select(from_json($"value", schema).as("data"), $"timestamp")
    .select("data.*", "timestamp")

  df1.writeStream
    .format("console")
    .option("truncate","false")
    .outputMode(OutputMode.Append)
    .start()
    .awaitTermination()

}

Я получаю следующую ошибку:

Exception in thread "main" java.lang.IllegalArgumentException: 'path' is not specified

Кто-нибудь знает, чего мне не хватает?

1 Ответ

0 голосов
/ 08 января 2019

Кажется, что это может быть проблемой в этой части вашего кода:

  streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
    writeStream
    .format("csv")
    .option("topic", "topic_test")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
    .start()

потому что вы используете формат "csv", но не указываете местоположение файла, в котором он нуждается. Вместо этого вы конфигурируете свойства Kafka, чтобы использовать тему kafka в качестве вашего приемника. Так что если вы измените формат на "Кафка", он должен работать.

Другая проблема, с которой вы можете поэкспериментировать, используя csv в качестве источника, заключается в том, что ваш путь должен быть каталогом, а не файлом. В вашем случае, если вы создадите каталог и переместите свой CSV-файл, он будет работать.

Просто для тестирования создайте директорию с именем C: /Users/me/Desktop/Tasks/Tasks1/test.csv и создайте файл с именем part-0000.csv внутри. Затем включите содержимое CSV в этот новый файл и начните заново процесс.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...