Как мне сделать functions.from_csv в искровом структурированном потоке - PullRequest
0 голосов
/ 06 августа 2020

Я читаю строки из источника kafka и хочу создать потребителя kafka ... в структурированной потоковой передаче искры я знаю, как сообщить Spark, что входящие строки имеют тип json ... как мне сделать то же самое с from_csv?

   val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "topic2")
      .option("startingOffsets", "earliest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .select(functions.from_json($"value", retailDataSchema).as("data"))
      lines.printSchema()

Схема:

        val retailDataSchema = new StructType()
          .add("InvoiceNo", IntegerType)
          .add("Quantity", IntegerType)
          .add("Country", StringType)

Спасибо!

Входные данные выглядят так:

1 Ответ

0 голосов
/ 06 августа 2020

Вы можете выполнить эту работу вокруг:

    val lines = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "topic2")
            .option("startingOffsets", "earliest")
            .load()
            .select(col("value").cast("string")).as("data").select("data.*").selectExpr("cast(split(value,',')[0] as DataTypes.IntegerType) as InvoiceNo"
                    ,"cast(split(value,',')[1] as DataTypes.IntegerType) as Quantity"
                    ,"cast(split(value,',')[2] as DataTypes.StringType) as Country" );
    lines.printSchema();

Или вы можете использовать встроенную функцию from_csv Since Apache spark 3.0.0

val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic2")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .select(functions.from_csv($"value", retailDataSchema).as("data"))
  lines.printSchema()

Apache Spark Docs для from_csv встроенной функции

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...