Рассмотрим данные, записанные из dataframe
в kafka
и затем прочитанные из kafka
обратно в новый dataframe
:
// Write from df to kafka
val wdf = airj.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "air2008")
.save
Теперь прочитайте данные обратно
// Read from kafka into spark df
import org.apache.spark.sql.functions._
val flights = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "air2008")
.load())
Сколько записей?
scala> flights.count
res36: Long = 5824436
Давайте зарегистрируем это как таблицу:
flights.createOrReplaceTempView("flights_raw")
Давайте спросим, что по-другому : сколько записей .. ??!
spark.sql("select count(1) from flights_raw").show
+--------+
|count(1)|
+--------+
|0 |
+--------+
Давайте зададим вопрос первым способом:
scala> flights.count
res40: Long = 0
Что здесь произошло?