Я пытаюсь прочитать поток данных из сокета (ps.pndsn.com) и записать его в temp_table для дальнейшей обработки, но в настоящее время я сталкиваюсь с проблемой, что temp_table, который я создал как часть writeStream, пуст, хотя потоковая передача происходит в режиме реального времени. Поэтому обращаемся за помощью в этом отношении.
Ниже приведен фрагмент кода:
# Create DataFrame representing the stream of input streamingDF from connection to ps.pndsn.com:9999
streamingDF = spark \
.readStream \
.format("socket") \
.option("host", "ps.pndsn.com") \
.option("port", 9999) \
.load()
# Is this DF actually a streaming DF?
streamingDF.isStreaming
spark.conf.set("spark.sql.shuffle.partitions", "2") # keep the size of shuffles small
query = (
streamingDF
.writeStream
.format("memory")
.queryName("temp_table") # temp_table = name of the in-memory table
.outputMode("Append") # Append = OutputMode in which only the new rows in the streaming DataFrame/Dataset will be written to the sink
.start()
)
Потоковый вывод:
{'channel': 'pubnub-sensor-network',
'message': {'ambient_temperature': '1.361',
'humidity': '81.1392',
'photosensor': '758.82',
'radiation_level': '200',
'sensor_uuid': 'probe-84d85b75',
'timestamp': 1581332619},
'publisher': None,
'subscription': None,
'timetoken': 15813326199534409,
'user_metadata': None}
Вывод temp_table пуст.