Я пытаюсь получить данные от производителя kafka в hdfs в структурированном spark
с помощью pyspark
. Я не использую поисковик, такой как улей и impala
. Я просто хотел сохранить данные от производителя kafka в hdfs в json
, используя структурированную потоковую передачу в pyspark
.
Мой код приведен ниже:
kafka Вывод
from pyspark.shell import spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import json
import requests
from pyspark.sql import functions as col
from pyspark.sql.Column import *
import sys
import decimal
import time
import datetime
import calendar
print("Weather Monitering Streaming with Kafka Started")
KAFKA_TOPIC_NAME_CONS = "test1topic"
KAFKA_BOOTSTRAP_SERVERS_CONS = '192.168.10.2:9092'
spark = SparkSession.builder\
.master(master= "local[*]")\
.appName( name="Spark Structured Streaming with kafka")\
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
#stream from kafka
weather_detail_df=spark.readStream\
.format(source="kafka")\
.option("kafka.bootstrap.servers",KAFKA_BOOTSTRAP_SERVERS_CONS)\
.option("Subscribe", KAFKA_TOPIC_NAME_CONS)\
.option("startingoffsets","latest")\
.load()
print ("Printing the schema of weather_detail_df : ")
weather_detail_df.printSchema()
weather_detail_df_1=weather_detail_df.selectExpr(expr= 'CAST(value AS STRING)', 'CAST(timestamp AS TIMESTAMP)')
#define a schema for the transaction_details data
transaction_detail_schema =StructType([StructField('CityName', StringType(), True),
StructField('Temperature', DoubleType(), True),
StructField('Humidity', IntegerType(), True),
StructField('CreationTime', StringType(), True)])
weather_detail_df_2= weather_detail_df_1\
.select(from_json(col(colName="value"), transaction_detail_schema))\
.as(alis="weather_detail"), col(colName="timestamp"))
weather_detail_df_3=weather_detail_df_2.select(col="weather detail.*", cols="timestamp")
weather_detail_df_4=weather_detail_df_3.withColumn(colName="CreateDate", weather_detail_df_3("createTime").cast(DataType))
print("printing the schema of weather_details_4: ")
weather_detail_df_5=weather_detail_df_4.select(col="CityName", cols="Temperature", "Humidity", "CreationTime","CreationDate")
print ("printing the schema of weather_detail_5: ")
weather_detail_df_5.printSchema()
#writing the final result into console for debugging purpose
weather_detail_write_stream =weather_detail_df_5\
.writeStream\
.trigger(Trigger.ProcessingTime(interval="10 seconds"))\
.outputMode(outputMode="append")\
.option("truncate","false")\
.format(source="console")\
.start()\
#write the final result into HDFS
weather_detail_df_5.writeStream\
.format(source ="csv")\
.option("path"," hdfs path")\
.option("checkpointLocation", "path")\
.start()
weather_detail_write_stream.awaitTermination()
print("Weather Monitering Streaming with Kafka has been succesfully completed")
Спасибо