Мне нужно записать поток из веб-сокета в файл паркета, используя apache spark. Похоже, что текущая функциональность потоковой передачи Apache не поддерживает веб-сокеты "из коробки".
В apache-spark есть команда для чтения потока из сокета TCP, поэтому я попытался преобразовать веб-сокет в обычный сокет, но пока не смог получить искру для чтения сокета с помощью тестового сценария:
Я настраиваю сервер так:
import socket, socketserver, time
class MyHandler(socketserver.BaseRequestHandler):
def handle(self):
counter = 1
while 1:
#dataReceived = self.request.recv(1024)
#if not dataReceived: break
str_send = 'msg ' + str(counter)
self.request.send(str_send.encode("utf-8"))
counter+=1
time.sleep(2)
myServer = socketserver.TCPServer(('localhost',5146), MyHandler)
myServer.serve_forever( )
Что нормально работает с обычным клиентом:
import socket, socketserver, time
def client(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
while True:
response = str(sock.recv(1024))
print("Received: {}".format(response))
ip = 'localhost'
port = 5146
client(ip, port)
Но когда я использую пример spark для чтения потока TCP, я все равно не получаю данных:
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 5146) \
.load()
query = lines.writeStream\
.format("console")\
.outputMode('append')\
.start()\
.awaitTermination()
Я также пытался записать в файл, но файл пуст.
Соединение установлено, но данные не поступают:
$ netstat -na | grep "5146"
tcp4 0 0 127.0.0.1.5146 127.0.0.1.59823 ESTABLISHED
tcp4 0 0 127.0.0.1.59823 127.0.0.1.5146 ESTABLISHED
tcp4 0 0 127.0.0.1.5146 *.* LISTEN