Я пытаюсь отправить записи из потоков kinesis на AWS SNS topi c. но, похоже, ничего не происходит, и я не получаю сообщения на topi c. Ниже мой код. Пожалуйста, сообщите, что я делаю неправильно здесь. Я пробовал и foreach и foreachBatch. Пожалуйста, обратитесь к закомментированному разделу в коде для foreachBatch. Чтобы проверить, я даже попытался жестко закодировать строку, чтобы увидеть, работает ли foreach, но не повезло. Помогите пожалуйста!
//Stream reader (Consumer)
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import com.amazonaws.services.kinesis.model.PutRecordRequest
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import java.nio.ByteBuffer
import scala.util.Random
import java.util.Base64
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
import com.amazonaws.services.sns.{AmazonSNS, AmazonSNSClientBuilder}
import com.amazonaws.services._
import org.apache.spark.sql._
import com.amazonaws.services.sns.model._
val dataSchema = new StructType()
.add("serialnumber", StringType)
.add("purposeId", StringType)
.add("action", StringType)
.add("locale", StringType)
.add("datetime", TimestampType)
.add("ingestiontime", LongType)
val kinesisStreamName = "mystream"
val kinesisRegion = "us-west-2"
val kinesisDF = spark.readStream
.format("kinesis")
.option("streamName", kinesisStreamName)
.option("region", kinesisRegion)
.option("initialPosition", "latest") //"TRIM_HORIZON")
.load()
val recordsStreamDf = kinesisDF
.selectExpr("cast (data as STRING) jsonData")
.select(from_json($"jsonData", dataSchema)/*.cast("struct<installmentNo: integer, loanId: integer>")*/.as("consents"))
.select("consents.*")
display(recordsStreamDf)
val query =
recordsStreamDf
.writeStream.foreach(
new ForeachWriter[Row] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: Row): Unit = {
SNSPublisher("test").publishMessage()
}
def close(errorOrNull: Throwable): Unit = {
true
}
}
)
/*.foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
val data = batchDF.withColumn("jsonData", to_json(struct($"serialnumber", $"purposeId", $"action", $"locale", $"datetime", $"ingestiontime"))).select($"jsonData").collect
for (i <- data) {
try {
SNSPublisher(i.getString(0)).publishMessage()
println("Metrics posted to SNS")
} catch {
case e: Exception =>
println("Exception posting message to SNS: " + e)
throw e
}
}
batchDF.foreachPartition {
partitionData =>
partitionData.foreach(row => {
SNSPublisher(row.getString(0)).publishMessage()
})
}
}*/
.outputMode("append")
.format("json")
.queryName("count")
.option("path", "s3://path/dev/user_personal/abhishek.ghosh/") // counts = name of the in-memory table
.option("checkpointLocation", "s3://path/dev/user_personal/abhishek.ghosh/checkpointLocation/")
//.outputMode("complete") // complete = all the counts should be in the table
.start()
//SNS Publisher class for notification of metrics
class SNSPublisher(message: String) {
implicit val sns: SNSPublisher = this
implicit val snsClient: AmazonSNS = AmazonSNSClientBuilder.standard.build
val this.message=message
var msgId=""
def publishMessage() {
msgId = snsClient.publish(new PublishRequest().withTopicArn("my_arn").withMessage(message)).getMessageId
}
}
object SNSPublisher {
def apply(message: String): SNSPublisher = {
new SNSPublisher(message)
}
} ```