Текстовый поток сокета Spark пуст - PullRequest
0 голосов
/ 31 октября 2019

Я слежу за потоковой игрой Spark . Вместо использования nc -lk 9999 я создал свой собственный простой сервер Python следующим образом. Как видно из приведенного ниже кода, он будет случайным образом генерировать буквы от a до z.

import socketserver
import time
from random import choice

class AlphaTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print('AlphaTCPHandler')
        alphabets = list('abcdefghikjklmnopqrstuvwxyz')

        try:
            while True:
                s = f'{choice(alphabets)}'
                b = bytes(s, 'utf-8')
                self.request.sendall(b)
                time.sleep(1)
        except BrokenPipeError:
            print('broken pipe detected')

if __name__ == '__main__':
    host = '0.0.0.0'
    port = 301

    server = socketserver.TCPServer((host, port), AlphaTCPHandler)
    print(f'server starting {host}:{port}')
    server.serve_forever()

. Я тестировал этот сервер с кодом клиента следующим образом.

import socket
import sys
import time

HOST, PORT = 'localhost', 301
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:
    sock.connect((HOST, PORT))
    print('socket opened')

    while True:    
        received = str(sock.recv(1024), 'utf-8')
        if len(received.strip()) > 0:
            print(f'{received}')
        time.sleep(1)
finally:
    sock.close()
    print('socket closed')

Однако мой потоковый код Spark, похоже, не получает никаких данных или ничего не печатает. Код выглядит следующим образом.

from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

counts.pprint()

ssc.start()
sleep(5)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

Все, что я вижу из вывода, - это повторяющаяся схема ниже.

-------------------------------------------
Time: 2019-10-31 08:38:22
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 08:38:23
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 08:38:24
-------------------------------------------

Есть идеи о том, что я делаю неправильно?

1 Ответ

1 голос
/ 31 октября 2019

Ваш потоковый код работает правильно. Именно ваш сервер передает ему неверную информацию - после каждой буквы нет разделителей строк, поэтому Spark видит одну постоянно растущую строку и просто продолжает ждать окончания этой строки, чего никогда не происходит. Модифицируйте свой сервер так, чтобы отправлять новую строку с каждым письмом:

while True:
    s = f'{choice(alphabets)}\n'  # <-- inserted \n in here
    b = bytes(s, 'utf-8')
    self.request.sendall(b)
    time.sleep(1)

И результат:

-------------------------------------------
Time: 2019-10-31 12:09:26
-------------------------------------------
('t', 1)

-------------------------------------------
Time: 2019-10-31 12:09:27
-------------------------------------------
('t', 1)

-------------------------------------------
Time: 2019-10-31 12:09:28
-------------------------------------------
('x', 1)
...