Я пытаюсь прочитать данные из Kafka topi c с кодом ниже:
object Main {
def main(args: Array[String]) {
val sparkSession = createSparkSession()
val df = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
val df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df1.writeStream.format("parquet").option("format","append").option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete").option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc").option("truncate", "false").outputMode("append").start()
}
}
Когда я использую следующую строку:
df1.writeStream
.format("console")
.option("truncate","false")
.start()
.awaitTermination()
, вывод будет отображается на консоли.
Но проблема в том, что я заменяю строку выше строки кода:
df1.writeStream
.format("csv")
.option("format","append")
.option("checkpointLocation", "/home/krishna/Downloads/kafka_2.12-2.0.0/delete")
.option("path", "/home/krishna/Downloads/kafka_2.12-2.0.0/abc")
.option("truncate", "false")
.outputMode("append")
.start()
Тогда выходные данные не сохраняются в формате CSV. Создается только папка ab c и в ней создается папка метаданных, но в ней нет файла CSV.
Я не могу понять, что если o / p успешно отображается на консоли, то почему он не сохраняется в файл в виде CSV, паркета или текста.
Пример вывода:
------------------
| key | value |
------------------
| null | abc |
| null | 123 |
|-----------------
Зависимости:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>