я пытаюсь получить данные от производителя kafka в hdfs в структурированной искре с помощью pyspark - PullRequest
0 голосов
/ 10 ноября 2019

Я пытаюсь получить данные от производителя kafka в hdfs в структурированном spark с помощью pyspark. Я не использую поисковик, такой как улей и impala. Я просто хотел сохранить данные от производителя kafka в hdfs в json, используя структурированную потоковую передачу в pyspark.

Мой код приведен ниже:

kafka Вывод

from pyspark.shell import spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import json
import requests
from pyspark.sql import functions as col
from pyspark.sql.Column import *
import sys
import decimal
import time
import datetime
import calendar

print("Weather Monitering Streaming with Kafka Started")

KAFKA_TOPIC_NAME_CONS = "test1topic"
KAFKA_BOOTSTRAP_SERVERS_CONS = '192.168.10.2:9092'
spark = SparkSession.builder\
    .master(master= "local[*]")\
    .appName( name="Spark Structured Streaming with kafka")\
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

#stream from kafka
weather_detail_df=spark.readStream\
    .format(source="kafka")\
    .option("kafka.bootstrap.servers",KAFKA_BOOTSTRAP_SERVERS_CONS)\
    .option("Subscribe", KAFKA_TOPIC_NAME_CONS)\
    .option("startingoffsets","latest")\
    .load()
print ("Printing the schema of weather_detail_df : ")
weather_detail_df.printSchema()

weather_detail_df_1=weather_detail_df.selectExpr(expr= 'CAST(value AS STRING)', 'CAST(timestamp AS TIMESTAMP)')

#define a schema for the transaction_details data

transaction_detail_schema =StructType([StructField('CityName', StringType(), True),
                     StructField('Temperature', DoubleType(), True),
                     StructField('Humidity', IntegerType(), True),
                     StructField('CreationTime', StringType(), True)])
weather_detail_df_2= weather_detail_df_1\
    .select(from_json(col(colName="value"), transaction_detail_schema))\
    .as(alis="weather_detail"), col(colName="timestamp"))

weather_detail_df_3=weather_detail_df_2.select(col="weather detail.*", cols="timestamp")

weather_detail_df_4=weather_detail_df_3.withColumn(colName="CreateDate", weather_detail_df_3("createTime").cast(DataType))

print("printing the schema of weather_details_4: ")
weather_detail_df_5=weather_detail_df_4.select(col="CityName", cols="Temperature", "Humidity", "CreationTime","CreationDate")

print ("printing the schema of weather_detail_5: ")
weather_detail_df_5.printSchema()

#writing the final result into console for debugging purpose

weather_detail_write_stream =weather_detail_df_5\
    .writeStream\
    .trigger(Trigger.ProcessingTime(interval="10 seconds"))\
    .outputMode(outputMode="append")\
    .option("truncate","false")\
    .format(source="console")\
    .start()\
#write the final result into HDFS

weather_detail_df_5.writeStream\
    .format(source ="csv")\
    .option("path"," hdfs path")\
    .option("checkpointLocation", "path")\
    .start()
weather_detail_write_stream.awaitTermination()
print("Weather Monitering Streaming with Kafka has been succesfully completed")

Спасибо

...