Я хотел бы знать, как отправить строку JSON в виде сообщения в тему kafka, используя функцию scala и потребляемую при помощи readstream () в структурированной потоковой передаче с искрой, сохраняя как формат паркета.
В настоящее время используется следующий код, но файл паркета не создается. Помогите пожалуйста получить файл паркета с данными. Это также реализовано как функции и требуется для вызова этих двух функций в интеграционных тестах.
Сообщение JSON отправлено в тему Кафки-
object kafkaProducer extends App {
def sendMessages(): Unit = {
//define topic
val topic = "spark-topic"
//define producer properties
val props = new java.util.Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "KafkaProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
//create producer instance
val kafkaProducer = new KafkaProducer[String, JsonNode](props)
//create object mapper
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
//mapper Json object to string
def toJson(value: Any): String = {
mapper.writeValueAsString(value)
}
//send producer message
val jsonstring =
s"""{
| "id": "0001",
| "name": "Peter"
|}
""".stripMargin
val jsonNode: JsonNode = mapper.readTree(jsonstring)
val rec = new ProducerRecord[String, JsonNode](topic, jsonNode)
kafkaProducer.send(rec)
//println(rec)
}
}
Spark Structured Streaming - потребитель kafka
object sparkConsumer extends App {
def receiveMessages(): Unit = {
//create a SparkSession for consumer
val spark = SparkSession
.builder()
.appName("SparkStructConsumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
//Subscribe the stream from Kafka
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "spark-topic")
.option("startingOffsets", "latest")
.option("groupid","kafka-spark-group")
.option("failOnDataLoss", false)
.load
//create schema for json string
val data_Schema: StructType = new StructType()
.add("id", StringType)
.add("name", StringType)
//Convert Stream according to schema along with TimeStamp
val df1 = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
.select(from_json($"value", data_Schema).as("data"), $"timestamp")
.select("data.id", "data.name", "timestamp")
.toDF("Id", "Name", "Timestamp")
//writing as parquet
val query = df1
.writeStream
.outputMode(OutputMode.Append)
.queryName("table")
.format("parquet")
.trigger(Trigger.ProcessingTime("5 seconds"))
.option("header", "true")
.option("checkpointLocation", "/Users/s/desktop/SparkOutput")
.option("path", "/Users/s/desktop/SparkOutput")
.start()
query.awaitTermination()
}
}
Интеграционные тесты
class SparkKafkaIntegrationTests extends FlatSpecLike with Matchers with EmbeddedKafka {
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
"Kafka producer sent JSON message" should "Consumed by the Spark Structured Streaming consumer" in {
//define zookeeper and kafkaserver
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
kafkaProducer.sendMessages()
sparkConsumer.receiveMessages()
}
}
Спасибо.
Shaamil.