Данные, считанные из кафки в искру, исчезают после регистрации в виде таблицы? - PullRequest
4 голосов
/ 08 апреля 2019

Рассмотрим данные, записанные из dataframe в kafka и затем прочитанные из kafka обратно в новый dataframe:

// Write from df to kafka
val wdf  = airj.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "air2008")
  .save

Теперь прочитайте данные обратно

// Read from kafka into spark df
import org.apache.spark.sql.functions._
val flights = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "air2008")
  .load())

Сколько записей?

scala> flights.count
res36: Long = 5824436

Давайте зарегистрируем это как таблицу:

flights.createOrReplaceTempView("flights_raw")

Давайте спросим, ​​что по-другому : сколько записей .. ??!

spark.sql("select count(1) from flights_raw").show
+--------+
|count(1)|
+--------+
|0       |
+--------+

Давайте зададим вопрос первым способом:

scala> flights.count
res40: Long = 0

Что здесь произошло?

Ответы [ 2 ]

0 голосов
/ 08 апреля 2019

createOrReplaceTempView лениво оценивается, что означает, что оно не сохраняется в памяти.Для этого вам нужно cache данные.

flights.cache
flights.createOrReplaceTempView("flights_raw")

или

flights.createOrReplaceTempView("flights_raw")
spark.table("flights_raw")
spark.table("flights_raw").cache
spark.table("flights_raw").count

должны сделать трюк.

0 голосов
/ 08 апреля 2019

На основании комментария от @GiorgosMyrianthous я вставил _cache_. Это помогает, только если сделано до createOrReplaceTempView: следующим образом

Работает ли не :

import org.apache.spark.sql.functions._
val flights = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "air2008")
  .load()
flights.createOrReplaceTempView("flights_raw").cache

работает :

import org.apache.spark.sql.functions._
val flights = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "air2008")
  .load()
flights.cache
flights.createOrReplaceTempView("flights_raw")

Теперь это работает

scala> flights.count
res47: Long = 5824436
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...