Невозможно записать CSV-файл, принимая данные из kafka topi c в режиме Stream Streaming с scala - PullRequest
1 голос
/ 02 апреля 2020

Я пытаюсь прочитать данные из Kafka topi c с кодом ниже:

object Main {
  def main(args: Array[String]) {
    val sparkSession = createSparkSession()
    val df = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
    val df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    df1.writeStream.format("parquet").option("format","append").option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete").option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc").option("truncate", "false").outputMode("append").start()
  }
}

Когда я использую следующую строку:

df1.writeStream
  .format("console")
  .option("truncate","false")
  .start()
  .awaitTermination()

, вывод будет отображается на консоли.

Но проблема в том, что я заменяю строку выше строки кода:

df1.writeStream
  .format("csv")
  .option("format","append")
  .option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
  .option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc")
  .option("truncate", "false")
  .outputMode("append")
  .start()

Тогда выходные данные не сохраняются в формате CSV. Создается только папка ab c и в ней создается папка метаданных, но в ней нет файла CSV.

Я не могу понять, что если o / p успешно отображается на консоли, то почему он не сохраняется в файл в виде CSV, паркета или текста.
Пример вывода:

------------------
| key  | value   |
------------------
| null | abc     |
| null | 123     |
|-----------------

Зависимости:

<dependencies>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>2.4.5</version>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>2.4.5</version>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>2.4.5</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>2.4.5</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

Ответы [ 3 ]

1 голос
/ 03 апреля 2020

в консоли вы используете df, а для csv вы используете df1.

Большая часть кода выглядит хорошо для меня.

попробуйте это.

df.writeStream 
    .format("csv")
    .option("format", "append")
    .trigger(processingTime = "5 seconds")
    .option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
.option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc")
    .outputMode("append")
    .start()
0 голосов
/ 18 апреля 2020

Я протестировал приведенный ниже код на Spark 2.4.5, и он создает файлы csv по мере необходимости:

 val sparkSession = SparkSession.builder()
    .appName("myAppName")
    .master("local[*]")
    .getOrCreate()

  val df = sparkSession.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()

  val df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

  df1.writeStream
    .format("csv")
    .outputMode("append")
    .option("set", ",")
    .option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
    .option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc.csv")
    .start()
    .awaitTermination()

Этот код создаст папку с именем abc.csv. В зависимости от параллелизма вашего Sparksession (настраивается с помощью spark.default.parallelism), вы найдете столько csv-файлов, сколько у вас есть разделов. Количество файлов отражает количество разделов в DataFrame во время их записи. Если бы вы переделили его до этого, у вас было бы другое количество файлов.

В моем случае раздел был 2, поэтому я получил этот вывод в соответствующей папке:

> ~/abc.csv$ ll
total 28
drwxrwxr-x  3 x x 4096 Apr 18 17:01 ./
drwxr-xr-x 50 x x 4096 Apr 18 17:01 ../
-rw-r--r--  1 x x    8 Apr 18 17:00 part-00000-77250d4a-e3af-46ef-b572-5476a3d075dd-c000.csv
-rw-r--r--  1 x x    4 Apr 18 17:00 part-00000-82a76a8c-5977-4891-be36-1c2dc6837fb1-c000.csv
0 голосов
/ 03 апреля 2020

Попробуйте это:

df.writeStream
.outputMode(OutputMode.Append())
.format("csv")
.option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
.option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc/")
.start()

Вы можете использовать тип формата как: com.databricks.spark.csv

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...