Опция 'basePath' должна быть директорией Scala - PullRequest
0 голосов
/ 02 июля 2018

У меня есть этот код, и он выдает ошибку, basepath должен быть dir. Просто хочу запустить простую потоковую кафку Мойку.

val checkPointDir = "/tmp/offsets/" // "hdfs://hdfscluster/user/yarn/tmp/"

    def main(args: Array[String]): Unit ={
            lazy val spark = SparkSession
              .builder
              .appName("KafkaProducer")
              .master("local[*]")
              .getOrCreate()



           val query = writeStream(jsonDF, "test")
            query.awaitTermination()
          }


      def writeStream(df:DataFrame, topic:String): StreamingQuery = {
    //    log.warn("Writing to kafka")
        df
          //      .selectExpr( "CAST(value AS STRING)")
          .writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaServers)
          .option("topic", topic)
          .option("checkpointLocation", checkPointDir)
          .outputMode(OutputMode.Update)
          .start()
      }

Мой пользователь является владельцем этой папки / tmp / offsets. Я получаю это исключение.

java.lang.IllegalArgumentException: опция 'basePath' должна быть каталогом

1 Ответ

0 голосов
/ 03 июля 2018

"checkpointLocation" должен быть указан канонический путь к каталогу.

Этот каталог используется для хранения актуальных промежуточных СДР. Может быть сохранено более одного RDD, если имеется несколько контрольных точек. Данные каждого СДР хранятся в отдельном каталоге. Однако сами RDD разделены, каждый раздел хранится в отдельном файле внутри каталога RDD. При хранении файлов в HDFS Spark должен соблюдать свойство максимального размера блока. Хранение таких структурированных данных невозможно в одном файле, следовательно, в каталоге.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...