Используйте значения RDD - PullRequest
0 голосов
/ 18 марта 2019

У меня есть два кода в Python;один для отправки и другой для получения

1 Отправка:

import socket
from time import sleep
import os

host = 'localhost'
port = 17020

os.chdir(os.getcwd()+'/Data')

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
    print("Listening for a client at",host , port)
    conn, addr = s.accept()
    print("Connected by", addr)
    try:
        print("Reading file...")
        with open('cdr_data.csv') as f:
            for line in f:
                out = line.encode('utf-8')
                print('Sending line',line)
                conn.send(out)
                # sleep(10)
            print('End Of Stream.')
    except socket.error:
        print ("Error Occured. Client disconnected.")
conn.close()

Получите:

из pyspark import SparkContext, SparkConf из pyspark.streaming import StreamingContext из pyspark.sql import SparkSession, SQLContext

из pyspark.sql.types import Row

из pyspark.sql.types импорт StructType из pyspark.sql.types импорт StructField из pyspark.sql.types import StringType

sc = SparkContext (conf = SparkConf (). SetAppName ("Rec_Stream").setMaster ("local [2]"). set ("spark.driver.allowMultipleContexts", "true")) sc.setLogLevel ("WARN") ssc = StreamingContext (sc, 10)

lines_RDD =ssc.socketTextStream ("localhost", 17020)

data_RDD = lines_RDD.flatMap (лямбда-строка: line.split (',')) data_RDD.pprint ()

ssc.start ()
ssc.awaitTermination ()

Я хочу преобразовать мой data_RDD в Dataframe, идея состоит в том, чтобы проверить значение в моем RDD и затем выполнить соответствующее действие.

Пожалуйста, помогите мне!

из моего csv-файла, который я отправляю: 212706837451,2019-03-17,120.12,5 212629657036,2019-03-17,220,66,0

...