постоянные обновления таблиц с потоковой передачей - PullRequest
0 голосов
/ 09 мая 2018

У меня есть приложение для потоковой передачи с искровым структурированием (прослушивание kafka), которое также считывает данные из постоянной таблицы в s3. Я пытаюсь проверять каждую микробатку на наличие обновлений в таблице. Я пытался

var myTable = spark.table("myTable!")

и

spark.sql("select * from parquet.`s3n://myFolder/`")

Оба не работают в контексте потоковой передачи. Проблема в том, что файл паркета меняется при каждом обновлении, и спарк не запускает ни одной из нормальных команд для обновления, таких как:

spark.catalog.refreshTable("myTable!")
spark.sqlContext.clearCache()

Я также пробовал:

spark.sqlContext.setConf("spark.sql.parquet.cacheMetadata","false")
  spark.conf.set("spark.sql.parquet.cacheMetadata",false)

без облегчения. Должен быть способ сделать это. Было бы разумнее использовать вместо JDBC соединение с базой данных?

Ответы [ 2 ]

0 голосов
/ 22 мая 2018

Это выполнит то, что я ищу.

val df1Schema = spark.read.option("header", "true").csv("test1.csv").schema
    val df1 = spark.readStream.schema(df1Schema).option("header", "true").csv("/1")
    df1.writeStream.format("memory").outputMode("append").queryName("df1").start()

    var df1 = sql("select * from df1")

Недостатком является то, что его добавление. Обойти одну проблему - это удалить дубликаты на основе идентификатора и с самой новой датой.

val dfOrder = df1.orderBy(col("id"), col("updateTableTimestamp").desc)

val dfMax = dfOrder.groupBy(col("id")).agg(first("name").as("name"),first("updateTableTimestamp").as("updateTableTimestamp"))
0 голосов
/ 14 мая 2018

Предполагая, что я правильно вас понял, я считаю, что проблема заключается в том, что поскольку DataFrame неизменны, вы не сможете увидеть изменения в своей таблице паркета, если не перезапустите потоковый запрос и не создадите новый DataFrame. Этот вопрос появился в списке рассылки Spark до . Окончательный ответ, как представляется, заключается в том, что единственный способ получить эти обновления - перезапустить потоковый запрос . Если ваше приложение не может терпеть икоты в течение 10 секунд, вы, возможно, захотите ознакомиться с этим сообщением в блоге, в котором кратко описан вышеупомянутый разговор и обсуждается, как SnappyData включает мутации в кадрах данных Spark .

Отказ от ответственности: я работаю на SnappyData

...