пытается отправить записи (строки) с помощью foreachbatch и foreach на AWS SNS - PullRequest
0 голосов
/ 20 марта 2020

Я пытаюсь отправить записи из потоков kinesis на AWS SNS topi c. но, похоже, ничего не происходит, и я не получаю сообщения на topi c. Ниже мой код. Пожалуйста, сообщите, что я делаю неправильно здесь. Я пробовал и foreach и foreachBatch. Пожалуйста, обратитесь к закомментированному разделу в коде для foreachBatch. Чтобы проверить, я даже попытался жестко закодировать строку, чтобы увидеть, работает ли foreach, но не повезло. Помогите пожалуйста!


//Stream reader (Consumer)
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import com.amazonaws.services.kinesis.model.PutRecordRequest
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import java.nio.ByteBuffer
import scala.util.Random
import java.util.Base64
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
import com.amazonaws.services.sns.{AmazonSNS, AmazonSNSClientBuilder}
import com.amazonaws.services._
import org.apache.spark.sql._
import com.amazonaws.services.sns.model._

val dataSchema = new StructType()
        .add("serialnumber", StringType)
        .add("purposeId", StringType)
        .add("action", StringType)
        .add("locale", StringType)
        .add("datetime", TimestampType)
        .add("ingestiontime", LongType)

val kinesisStreamName = "mystream"
val kinesisRegion = "us-west-2"
val kinesisDF = spark.readStream
      .format("kinesis")
      .option("streamName", kinesisStreamName)
      .option("region", kinesisRegion)
      .option("initialPosition", "latest") //"TRIM_HORIZON")
      .load()

val recordsStreamDf = kinesisDF
  .selectExpr("cast (data as STRING) jsonData")
  .select(from_json($"jsonData", dataSchema)/*.cast("struct<installmentNo: integer, loanId: integer>")*/.as("consents"))
  .select("consents.*")

display(recordsStreamDf)

val query =
  recordsStreamDf
    .writeStream.foreach(
        new ForeachWriter[Row] {

        def open(partitionId: Long, version: Long): Boolean = {
          true
        }

        def process(record: Row): Unit = {
          SNSPublisher("test").publishMessage()
        }

        def close(errorOrNull: Throwable): Unit = {
          true
        }
      }
    )
    /*.foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
          val data = batchDF.withColumn("jsonData", to_json(struct($"serialnumber", $"purposeId", $"action", $"locale", $"datetime", $"ingestiontime"))).select($"jsonData").collect
          for (i <- data) {
            try {
                SNSPublisher(i.getString(0)).publishMessage()
                println("Metrics posted to SNS")
            } catch {
                case e: Exception =>
                println("Exception posting message to SNS: " + e)
                throw e
            }  
          }
      batchDF.foreachPartition {
            partitionData =>
              partitionData.foreach(row => {
              SNSPublisher(row.getString(0)).publishMessage()
            })
      }
     }*/
    .outputMode("append")
    .format("json")       
    .queryName("count")
    .option("path", "s3://path/dev/user_personal/abhishek.ghosh/")  // counts = name of the in-memory table
    .option("checkpointLocation", "s3://path/dev/user_personal/abhishek.ghosh/checkpointLocation/")
     //.outputMode("complete")  // complete = all the counts should be in the table
    .start()



//SNS Publisher class for notification of metrics
class SNSPublisher(message: String) {
  implicit val sns: SNSPublisher = this
  implicit val snsClient: AmazonSNS = AmazonSNSClientBuilder.standard.build
  val this.message=message
  var msgId=""
  def publishMessage() {
        msgId = snsClient.publish(new PublishRequest().withTopicArn("my_arn").withMessage(message)).getMessageId
  } 
}

object SNSPublisher {
  def apply(message: String): SNSPublisher = {
    new SNSPublisher(message)
  }
} ```




Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...