Я пытаюсь прочитать сообщения из очереди Amazon SQS. Разрешения работают, я вижу количество записей - но все записи нулевые. Не могу понять, почему я получаю нулевые значения. Я могу видеть сообщения в очереди SQS и получать их из локального экземпляра Python, и они на самом деле являются записями JSON, отражающими схему (хотя и не уверены на 100%, что я правильно реализовал схему).
Также использование формата "rate" работает по всему коду.
Документация по этому вопросу очень скудна.
Любые предложения будут оценены.
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
val awsAccessKey = "blahblah"
val awsSecretKey = "blahblahblahblah"
val awsRegion = "us-east-1"
val SQSQueue = "https://sqs.us-east1.amazonaws.com/blahblahblahblah/blahblahblahblah"
// SQS Event Structure
val sqsSchema = new StructType()
.add(StructField("Records", ArrayType(new StructType()
.add(StructField("eventVersion", StringType))
.add(StructField("eventSource", StringType))
.add(StructField("awsRegion", StringType))
.add(StructField("eventTime", StringType))
.add(StructField("eventName", StringType))
.add(StructField("userIdentity",StringType))
.add(StructField("eventName", StringType))
.add("userIdentity", new StructType()
.add(StructField("principalId", StringType)))
.add("requestParameters", new StructType()
.add(StructField("sourceIPAddress", StringType)))
.add("responseElements", new StructType()
.add(StructField("x-amz-request-id", StringType))
.add(StructField("x-amz-id-2", StringType))
)
.add("s3", new StructType()
.add(StructField("s3SchemaVersion", StringType))
.add(StructField("configurationId", StringType))
.add("bucket", new StructType()
.add(StructField("name", StringType))
.add("ownerIdentity", new StructType()
.add(StructField("principalId", StringType)))
.add(StructField("arn", StringType)))
.add("object", new StructType()
.add(StructField("key",StringType))
.add(StructField("size", IntegerType))
.add(StructField("eTag", StringType))
.add(StructField("sequencer", StringType))
)
))))
val df = spark.readStream
.format("s3-sqs")
//.format("rate") // this works
.option("queueUrl", SQSQueue)
.option("region",awsRegion)
.option("awsAccessKey",awsAccessKey)
.option("fileFormat", "json")
.schema(sqsSchema)
//.option("sqsFetchInterval", "1m")
.load()
df.writeStream
.queryName("sqs_records") // this query name will be the table name
.outputMode("append")
.format("memory")
.start()
val records = spark.sql("select * from sqs_records")
> records.count
res142: Long = 4894
>%sql
select * from sqs_records
Records
null
null
...