что я хотел сделать, это прочитать данные из CSV-файла и отправить их на сервер, вот что я пишу, но на сервере я ничего не получаю, либо сгенерирована ошибка, все, что я получил, является пустым
def __init__(self):
self.host = 'localhost'
self.port = 12345
def SetUpConnection(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((self.host, self.port))
s.listen(1)
print('\nListening for a client at',self.host , self.port)
self.conn, self.addr = s.accept()
print('\nConnected by', self.addr)
def FireStream(self):
try:
print('\nReading firedata...\n')
with open('FireData-Part2.csv') as f:
with open('ClimateData-Part2.csv') as c:
FireData_csv = csv.reader(f)
Fire_headers = next(FireData_csv)
ClimateData_csv = csv.reader(c)
Climate_headers = [x.strip() for x in next(ClimateData_csv)] # read from second line in csv file
while(True):
every_5_fire = next_n_lines(f,5)
every_1_climate = next(c)
every_5_fire.append(every_1_climate)
row = {}
row['firedata']=[]
for each in every_5_fire:
alist = each.split(',')
dataType = alist[0]
if dataType == 'cdata':
for column in range(len(alist)):
row[Climate_headers[column]] = alist[column]
if dataType == 'fdata':
firedata = {}
for column2 in range(len(alist)):
firedata[Fire_headers[column2]]= alist[column2]
row['firedata'].append(firedata)
out = json.dumps(row)# encode it before sending
self.conn.send(bytes(out,encoding = "utf8"))
print('Sending line',out)
sleep(1) # ensure every 1 second 5 of firedata is generated
print('End Of Stream fire.')
except socket.error:
print ('Error Occured.\n\nClient disconnected.\n')
серверный код просто распечатывает их как:
sc = SparkContext.getOrCreate()
if (sc is None):
sc = SparkContext(appName="MongoDBApp")
ssc = StreamingContext(sc, 5)
host = "localhost"
port = 12345
lines = ssc.socketTextStream(host, int(port))
# output = lines.map(lambda s: json.loads(s))
#lines.foreachRDD(lambda rdd: rdd.foreachPartition(sendRecord2))
lines.pprint()
ssc.start()
Не работает, даже если я попробовал json.loads, это слишком странно, кто-то может помочь, будет чрезвычайно признателен!Я так расстроился, когда пытался целую ночь
Примерные данные Джейсона выглядят так:
{"firedata": [{"DataType": "fdata", "Latitude": "-35.541", "Longitude": "143.311", "Surface Temperature (kelvin)": "336.3", "Power": "62", "Confidence": "82", "Surface Temperature (Celcius)": "63"}, {"DataType": "fdata", "Latitude": "-35.554", "Longitude": "143.307", "Surface Temperature (kelvin)": "326.8", "Power": "23.8", "Confidence": "67", "Surface Temperature (Celcius)": "53"}, {"DataType": "fdata", "Latitude": "-35.543", "Longitude": "143.316", "Surface Temperature (kelvin)": "340.4", "Power": "84.2", "Confidence": "86", "Surface Temperature (Celcius)": "67"}, {"DataType": "fdata", "Latitude": "-37.708", "Longitude": "145.1", "Surface Temperature (kelvin)": "327.8", "Power": "16.2", "Confidence": "80", "Surface Temperature (Celcius)": "54"}, {"DataType": "fdata", "Latitude": "-35.646", "Longitude": "142.282", "Surface Temperature (kelvin)": "305.6", "Power": "11.8", "Confidence": "65", "Surface Temperature (Celcius)": "32"}], "DataType": "cdata", "Station": "948700", "Air Temperature(Celcius)": "19", "Relative Humidity": "56.8", "WindSpeed (knots)": "7.9", "Max Wind Speed": "11.1", "MAX": " 72.0*", "MIN": " 61.9*", "Precipitation": " 0.00I\n"}
Я не видел ошибок в моих данных JSON