Я хочу прочитать файл из метаданных файла, которые я отправляю как сообщение kafka. Сообщение Kafka содержит такие сведения, как полное имя файла, сохраненное в формате hdf. Я хочу, чтобы мое потоковое приложение считывало метаданные файла и считывало файл, чтобы применить к нему некоторые логики c.
Я мог бы использовать FileStreamSource, но он следит за одним каталогом. Однако в моем случае каталог может измениться. Можете ли вы предложить мне какой-нибудь обходной путь?
----- КОД KAFKA --------
import json
import requests
from kafka import KafkaProducer
from json import dumps
from time import sleep
import os
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x:dumps(x).encode('utf-8'))
for e in range(1000):
response=requests.get('https://api.rootnet.in/covid19-in/stats/testing/history').text
file_name='response_'+str(e)+'.txt'
with open(file_name,'w') as file:
file.write(response)
string=json.dumps({"filename":"hdfs://localhost:9000/covid/"+file_name})
os.system('hdfs dfs -put '+file_name+' /covid')
print(string)
producer.send('load_json_topic',value=string)
sleep(3)
--------- КОД СПАРКА- -----------
import requests
import json
import os
from pyspark.sql.functions import split,explode
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
class DataCollection:
def __init__(self):
self.dataframe=None
self.status=None
self.datacount=None
self.lastupdated=None
config=SparkConf().setAppName("COVID").setMaster("local[*]")
self.sc=SparkContext(conf=config)
self.ssc=StreamingContext(self.sc,1)
def data_collection(self):
zkQuorum="localhost:2181"
topic="load_json_topic"
kvs = KafkaUtils.createDirectStream(self.ssc, [topic],{"metadata.broker.list": "localhost:9092"})
lines = kvs.map(lambda x: x[1])
response=lines.map(lambda x:json.loads(x))
spark = SparkSession.builder.master("local").appName("COVID").getOrCreate()
try:
filename=response.map(lambda x:json.loads(x)['filename'])
response=spark.read.json(filename)
response.pprint()
except Exception as e:
response.pprint()
print("***********************Exception occured******************************",e)
self.ssc.start()
self.ssc.awaitTermination()
print("Data Collected")