Преобразование DataFrame в RDD [(String, String)] - PullRequest
0 голосов
/ 25 марта 2019

Я хочу преобразовать org.apache.spark.sql.DataFrame в org.apache.spark.rdd.RDD[(String, String)] в Databricks. Может кто-нибудь помочь?

Справочная информация (и лучшее решение также приветствуется): у меня есть поток Kafka, который (после некоторых шагов) становится фреймом данных из 2 столбцов. Я хотел бы поместить это в кэш Redis, первый столбец в качестве ключа и второй столбец в качестве значения.

Более конкретно тип входа такой: lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]. Я пытаюсь вставить в Redis следующее:

sc.toRedisKV(lastContacts)(redisConfig)

Сообщение об ошибке выглядит так:

notebook:20: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
    (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)

Я уже поиграл с некоторыми идеями (например, функция .rdd), но ни одна не помогла.

1 Ответ

3 голосов
/ 25 марта 2019

Вы можете использовать df.map (row => ...) для преобразования кадра данных в RDD, если вы хотите отобразить строку в другой элемент RDD.

Например:

val df = Seq(("table1",432),
      ("table2",567),
      ("table3",987),
      ("table1",789)).
      toDF("tablename", "Code").toDF()

    df.show()

    +---------+----+
|tablename|Code|
+---------+----+
|   table1| 432|
|   table2| 567|
|   table3| 987|
|   table1| 789|
+---------+----+

    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

    OR

    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

См. https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html относительно AnalysisException: Запросы с потоковыми источниками должны выполняться с помощью writeStream.start ()

Вам нужно дождаться завершения запроса с помощью запроса. awaitTermination () Чтобы запретить завершение процесса, пока запрос активен.

...