pyspark: Spark Streaming через сокет - PullRequest
       20

pyspark: Spark Streaming через сокет

1 голос
/ 10 февраля 2020

Я пытаюсь прочитать поток данных из сокета (ps.pndsn.com) и записать его в temp_table для дальнейшей обработки, но в настоящее время я сталкиваюсь с проблемой, что temp_table, который я создал как часть writeStream, пуст, хотя потоковая передача происходит в режиме реального времени. Поэтому обращаемся за помощью в этом отношении.

Ниже приведен фрагмент кода:

# Create DataFrame representing the stream of input streamingDF from connection to ps.pndsn.com:9999
streamingDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "ps.pndsn.com") \
    .option("port", 9999) \
    .load()

# Is this DF actually a streaming DF?
streamingDF.isStreaming


spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingDF
    .writeStream
    .format("memory")       
    .queryName("temp_table")     # temp_table = name of the in-memory table
    .outputMode("Append")  # Append = OutputMode in which only the new rows in the streaming DataFrame/Dataset will be written to the sink
    .start()
)

Потоковый вывод:

{'channel': 'pubnub-sensor-network',
 'message': {'ambient_temperature': '1.361',
             'humidity': '81.1392',
             'photosensor': '758.82',
             'radiation_level': '200',
             'sensor_uuid': 'probe-84d85b75',
             'timestamp': 1581332619},
 'publisher': None,
 'subscription': None,
 'timetoken': 15813326199534409,
 'user_metadata': None}

Вывод temp_table пуст.

...