Spark Streaming - чтение файла с Kafka - PullRequest
0 голосов
/ 30 апреля 2020

Я хочу прочитать файл из метаданных файла, которые я отправляю как сообщение kafka. Сообщение Kafka содержит такие сведения, как полное имя файла, сохраненное в формате hdf. Я хочу, чтобы мое потоковое приложение считывало метаданные файла и считывало файл, чтобы применить к нему некоторые логики c.

Я мог бы использовать FileStreamSource, но он следит за одним каталогом. Однако в моем случае каталог может измениться. Можете ли вы предложить мне какой-нибудь обходной путь?

----- КОД KAFKA --------

import json
import requests
from kafka import KafkaProducer
from json import dumps
from time import sleep
import os

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x:dumps(x).encode('utf-8'))
for e in range(1000):
    response=requests.get('https://api.rootnet.in/covid19-in/stats/testing/history').text
    file_name='response_'+str(e)+'.txt'
    with open(file_name,'w') as file:
       file.write(response)

    string=json.dumps({"filename":"hdfs://localhost:9000/covid/"+file_name})
    os.system('hdfs dfs -put '+file_name+' /covid')
    print(string)
    producer.send('load_json_topic',value=string)
    sleep(3)

--------- КОД СПАРКА- -----------

import requests
import json
import os
from pyspark.sql.functions import split,explode
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession

class DataCollection:
    def __init__(self):
        self.dataframe=None
        self.status=None
        self.datacount=None
        self.lastupdated=None
        config=SparkConf().setAppName("COVID").setMaster("local[*]")
        self.sc=SparkContext(conf=config)
        self.ssc=StreamingContext(self.sc,1)

    def data_collection(self):
        zkQuorum="localhost:2181"
        topic="load_json_topic"
        kvs = KafkaUtils.createDirectStream(self.ssc, [topic],{"metadata.broker.list": "localhost:9092"})
        lines = kvs.map(lambda x: x[1])
        response=lines.map(lambda x:json.loads(x))
        spark = SparkSession.builder.master("local").appName("COVID").getOrCreate()

        try:
            filename=response.map(lambda x:json.loads(x)['filename'])
            response=spark.read.json(filename)
            response.pprint()

        except Exception as e:
            response.pprint()
            print("***********************Exception occured******************************",e)
        self.ssc.start()
        self.ssc.awaitTermination()
        print("Data Collected")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...