Получение только нулевых значений с использованием spark.readStream .format ("s3-sqs") для получения сообщений SQS - PullRequest
0 голосов
/ 02 мая 2018

Я пытаюсь прочитать сообщения из очереди Amazon SQS. Разрешения работают, я вижу количество записей - но все записи нулевые. Не могу понять, почему я получаю нулевые значения. Я могу видеть сообщения в очереди SQS и получать их из локального экземпляра Python, и они на самом деле являются записями JSON, отражающими схему (хотя и не уверены на 100%, что я правильно реализовал схему).

Также использование формата "rate" работает по всему коду.

Документация по этому вопросу очень скудна.

Любые предложения будут оценены.

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

val awsAccessKey = "blahblah"    
val awsSecretKey = "blahblahblahblah"
val awsRegion = "us-east-1"

val SQSQueue = "https://sqs.us-east1.amazonaws.com/blahblahblahblah/blahblahblahblah"

// SQS Event Structure
val sqsSchema = new StructType()
      .add(StructField("Records", ArrayType(new StructType()
      .add(StructField("eventVersion", StringType))
      .add(StructField("eventSource", StringType))
      .add(StructField("awsRegion", StringType))
      .add(StructField("eventTime", StringType))
      .add(StructField("eventName", StringType))
      .add(StructField("userIdentity",StringType))
      .add(StructField("eventName", StringType))
      .add("userIdentity", new StructType()
          .add(StructField("principalId", StringType)))
          .add("requestParameters", new StructType()
          .add(StructField("sourceIPAddress", StringType)))
     .add("responseElements", new StructType()
          .add(StructField("x-amz-request-id", StringType))
          .add(StructField("x-amz-id-2", StringType))
    )
.add("s3", new StructType()
    .add(StructField("s3SchemaVersion", StringType))
    .add(StructField("configurationId", StringType))
    .add("bucket",  new StructType()
      .add(StructField("name", StringType))
         .add("ownerIdentity", new StructType()
              .add(StructField("principalId", StringType)))
      .add(StructField("arn", StringType)))
    .add("object", new StructType()
         .add(StructField("key",StringType))
         .add(StructField("size", IntegerType))
         .add(StructField("eTag", StringType))
         .add(StructField("sequencer", StringType))
         )
     ))))

val df = spark.readStream
    .format("s3-sqs")
    //.format("rate") // this works
    .option("queueUrl", SQSQueue)
    .option("region",awsRegion)
    .option("awsAccessKey",awsAccessKey)
    .option("fileFormat", "json")
    .schema(sqsSchema)
    //.option("sqsFetchInterval", "1m")
    .load()

df.writeStream
      .queryName("sqs_records")    // this query name will be the table name
      .outputMode("append")
      .format("memory")
      .start()

val records = spark.sql("select * from sqs_records")

> records.count
    res142: Long = 4894

>%sql
    select * from sqs_records

Records
null
null
...

1 Ответ

0 голосов
/ 30 апреля 2019

Я столкнулся с этой же проблемой: применяемая вами схема - это схема для ваших данных, а НЕ записи событий s3, отправленные в sqs. Вы получаете нулевое значение, поскольку код применяет схему событий s3 к вашим данным, что, я уверен, не соответствует.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...