Я довольно новичок в структурированном потоковом вещании Spark и пытаюсь объединить несколько потоков, исходя из тем Кафки (Spark 2.3.2, Кафка 2.0)
Объединение прекрасно работает в потоках, где я могу выполнять простые равные соединения для ключей. В одном конкретном объединении из 2 тем мне нужно выполнить какое-то преобразование данных, потому что в одном разделе ключ объединения кодируется в шестнадцатеричном виде, а в другом - в base64.
После долгих отладок я пришел к следующему коду, который я тестирую в блокноте PySpark Zeppelin. 2 тематических потока хранятся в Python dict
debug = (topicStreams['invprop']
.where("invpropv.PHC_UID_IPID = '183C1BA9B3444919B6C33DAB0B639A87'")
.writeStream.outputMode("append").format("memory")
.queryName("debug").start()
)
Это возвращает ровно одно сообщение из первой темы, как я и ожидал
debug2 = (topicStreams['hca']
.where("hex(unbase64(hcav.id)) = '183C1BA9B3444919B6C33DAB0B639A87'")
.writeStream.....
Этот второй поток также возвращает одно сообщение, это, конечно, 2 сообщения, к которым я пытаюсь присоединиться. Я думаю, что могу предположить, что ключи действительно совпадают.
debug3 = (topicStreams['invprop']
.join(topicStreams['hca'],
expr("invpropv.PHC_UID_IPID = hex(unbase64(hcav.id))"))
.writeStream...
Это объединение никогда ничего не возвращает. Что может привести к сбою этого соединения? Наверное, я не замечаю чего-то простого.