Как читать карту Redis в спарк с помощью spark-Redis - PullRequest
0 голосов
/ 07 мая 2019

У меня есть нормальная карта скала в Redis (ключ и значение). Теперь я хочу прочитать эту карту в одной из моих программ потоковой передачи и использовать ее как переменную вещания, чтобы мои подчиненные могли использовать эту карту для разрешения сопоставления клавиш. Я использую библиотеку spark-redis 2.3.1, но теперь уверен, как это прочитать.

Карта в редис таблице "Сотрудник" -

name   |    value
------------------
123         David
124         John
125         Alex

Вот как я пытаюсь читать в искре (Не уверен, что это правильно - пожалуйста, поправьте меня) -

 val loadedDf = spark.read
  .format("org.apache.spark.sql.redis")
  .schema(
    StructType(Array(
      StructField("name", IntegerType),
      StructField("value", StringType)
    )
  ))
  .option("table", "employee")
  .option("key.column", "name")
  .load()
loadedDf.show() 

Приведенный выше код ничего не показывает, я получаю пустой вывод.

1 Ответ

2 голосов
/ 11 мая 2019

Вы можете использовать приведенный ниже код для своей задачи, но вам нужно использовать Spark Dataset (case-dataframe to case class) для выполнения этой задачи.Ниже приведен полный пример чтения и записи в Redis.

object DataFrameExample {

  case class employee(name: String, value: Int)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
          .builder()
          .appName("redis-df")
          .master("local[*]")
          .config("spark.redis.host", "localhost")
          .config("spark.redis.port", "6379")
          .getOrCreate()

    val personSeq = Seq(employee("John", 30), employee("Peter", 45)
    val df = spark.createDataFrame(personSeq)

    df.write
      .format("org.apache.spark.sql.redis")
      .option("table", "person")
      .mode(SaveMode.Overwrite)
      .save()

    val loadedDf = spark.read
                        .format("org.apache.spark.sql.redis")
                        .option("table", "person")
                        .load()
    loadedDf.printSchema()
    loadedDf.show()
  }
}

Выходные данные ниже

root
 |-- name: string (nullable = true)
 |-- value: integer (nullable = false)

+-----+-----+
| name|value|
+-----+-----+
| John| 30  |
|Peter| 45  |
+-----+-----+

Более подробную информацию можно также найти в Redis документации

...