Не в состоянии отправить закодированные данные Джейсона к искре - PullRequest
0 голосов
/ 20 мая 2018

что я хотел сделать, это прочитать данные из CSV-файла и отправить их на сервер, вот что я пишу, но на сервере я ничего не получаю, либо сгенерирована ошибка, все, что я получил, является пустым

def __init__(self):
    self.host = 'localhost'
    self.port = 12345

def SetUpConnection(self):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((self.host, self.port))
    s.listen(1)
    print('\nListening for a client at',self.host , self.port)
    self.conn, self.addr = s.accept()
    print('\nConnected by', self.addr)

def FireStream(self):  
        try:
            print('\nReading firedata...\n')
            with open('FireData-Part2.csv') as f:
                with open('ClimateData-Part2.csv') as c:
                    FireData_csv = csv.reader(f)
                    Fire_headers = next(FireData_csv)
                    ClimateData_csv = csv.reader(c)
                    Climate_headers = [x.strip() for x in next(ClimateData_csv)]                             # read from second line in csv file 

                    while(True):
                        every_5_fire = next_n_lines(f,5)
                        every_1_climate = next(c)
                        every_5_fire.append(every_1_climate)

                        row = {}
                        row['firedata']=[] 
                        for each in every_5_fire:
                            alist = each.split(',')
                            dataType = alist[0]     
                            if dataType == 'cdata':           
                                for column in range(len(alist)):
                                    row[Climate_headers[column]] = alist[column]           
                            if dataType == 'fdata':   
                                    firedata = {}
                                    for column2 in range(len(alist)): 
                                        firedata[Fire_headers[column2]]= alist[column2]               
                                    row['firedata'].append(firedata)
                        out = json.dumps(row)# encode it before sending   
                        self.conn.send(bytes(out,encoding = "utf8"))
                        print('Sending line',out)
                        sleep(1)  # ensure every 1 second 5 of firedata is generated 
                    print('End Of Stream fire.')
        except socket.error:
            print ('Error Occured.\n\nClient disconnected.\n')

серверный код просто распечатывает их как:

sc = SparkContext.getOrCreate()

if (sc is None):
    sc = SparkContext(appName="MongoDBApp")
ssc = StreamingContext(sc, 5)

host = "localhost"
port = 12345

lines = ssc.socketTextStream(host, int(port))

# output = lines.map(lambda s: json.loads(s))

#lines.foreachRDD(lambda rdd: rdd.foreachPartition(sendRecord2))

lines.pprint()

ssc.start()

Не работает, даже если я попробовал json.loads, это слишком странно, кто-то может помочь, будет чрезвычайно признателен!Я так расстроился, когда пытался целую ночь

Примерные данные Джейсона выглядят так:

{"firedata": [{"DataType": "fdata", "Latitude": "-35.541", "Longitude": "143.311", "Surface Temperature (kelvin)": "336.3", "Power": "62", "Confidence": "82", "Surface Temperature (Celcius)": "63"}, {"DataType": "fdata", "Latitude": "-35.554", "Longitude": "143.307", "Surface Temperature (kelvin)": "326.8", "Power": "23.8", "Confidence": "67", "Surface Temperature (Celcius)": "53"}, {"DataType": "fdata", "Latitude": "-35.543", "Longitude": "143.316", "Surface Temperature (kelvin)": "340.4", "Power": "84.2", "Confidence": "86", "Surface Temperature (Celcius)": "67"}, {"DataType": "fdata", "Latitude": "-37.708", "Longitude": "145.1", "Surface Temperature (kelvin)": "327.8", "Power": "16.2", "Confidence": "80", "Surface Temperature (Celcius)": "54"}, {"DataType": "fdata", "Latitude": "-35.646", "Longitude": "142.282", "Surface Temperature (kelvin)": "305.6", "Power": "11.8", "Confidence": "65", "Surface Temperature (Celcius)": "32"}], "DataType": "cdata", "Station": "948700", "Air Temperature(Celcius)": "19", "Relative Humidity": "56.8", "WindSpeed  (knots)": "7.9", "Max Wind Speed": "11.1", "MAX": "   72.0*", "MIN": "  61.9*", "Precipitation": " 0.00I\n"}

Я не видел ошибок в моих данных JSON

1 Ответ

0 голосов
/ 21 мая 2018

Это так странно, эта проблема решается путем добавления "\ n" к строковым данным json перед отправкой.в основном это выглядит так:

out = json.dumps(row) + "\n" 
self.conn.send(bytes(out,encoding = "utf8"))

Я потратил несколько дней, чтобы понять это, но я совершенно не представляю, почему это так, кто-нибудь может предложить какую-то подсказку?

...