У меня есть два кода в Python;один для отправки и другой для получения
1 Отправка:
import socket
from time import sleep
import os
host = 'localhost'
port = 17020
os.chdir(os.getcwd()+'/Data')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
print("Listening for a client at",host , port)
conn, addr = s.accept()
print("Connected by", addr)
try:
print("Reading file...")
with open('cdr_data.csv') as f:
for line in f:
out = line.encode('utf-8')
print('Sending line',line)
conn.send(out)
# sleep(10)
print('End Of Stream.')
except socket.error:
print ("Error Occured. Client disconnected.")
conn.close()
Получите:
из pyspark import SparkContext, SparkConf из pyspark.streaming import StreamingContext из pyspark.sql import SparkSession, SQLContext
из pyspark.sql.types import Row
из pyspark.sql.types импорт StructType из pyspark.sql.types импорт StructField из pyspark.sql.types import StringType
sc = SparkContext (conf = SparkConf (). SetAppName ("Rec_Stream").setMaster ("local [2]"). set ("spark.driver.allowMultipleContexts", "true")) sc.setLogLevel ("WARN") ssc = StreamingContext (sc, 10)
lines_RDD =ssc.socketTextStream ("localhost", 17020)
data_RDD = lines_RDD.flatMap (лямбда-строка: line.split (',')) data_RDD.pprint ()
ssc.start ()
ssc.awaitTermination ()
Я хочу преобразовать мой data_RDD в Dataframe, идея состоит в том, чтобы проверить значение в моем RDD и затем выполнить соответствующее действие.
Пожалуйста, помогите мне!
из моего csv-файла, который я отправляю: 212706837451,2019-03-17,120.12,5 212629657036,2019-03-17,220,66,0