Spark структурированное потоковое соединение не работает - PullRequest
0 голосов
/ 14 апреля 2019

Я довольно новичок в структурированном потоковом вещании Spark и пытаюсь объединить несколько потоков, исходя из тем Кафки (Spark 2.3.2, Кафка 2.0)

Объединение прекрасно работает в потоках, где я могу выполнять простые равные соединения для ключей. В одном конкретном объединении из 2 тем мне нужно выполнить какое-то преобразование данных, потому что в одном разделе ключ объединения кодируется в шестнадцатеричном виде, а в другом - в base64.

После долгих отладок я пришел к следующему коду, который я тестирую в блокноте PySpark Zeppelin. 2 тематических потока хранятся в Python dict

debug = (topicStreams['invprop']
   .where("invpropv.PHC_UID_IPID = '183C1BA9B3444919B6C33DAB0B639A87'")
   .writeStream.outputMode("append").format("memory")
   .queryName("debug").start()
)

Это возвращает ровно одно сообщение из первой темы, как я и ожидал

debug2 = (topicStreams['hca']
   .where("hex(unbase64(hcav.id)) = '183C1BA9B3444919B6C33DAB0B639A87'")
   .writeStream.....

Этот второй поток также возвращает одно сообщение, это, конечно, 2 сообщения, к которым я пытаюсь присоединиться. Я думаю, что могу предположить, что ключи действительно совпадают.

debug3 = (topicStreams['invprop']
   .join(topicStreams['hca'], 
         expr("invpropv.PHC_UID_IPID = hex(unbase64(hcav.id))"))
   .writeStream...

Это объединение никогда ничего не возвращает. Что может привести к сбою этого соединения? Наверное, я не замечаю чего-то простого.

Ответы [ 2 ]

0 голосов
/ 15 апреля 2019

Ну, как всегда, написание вопроса Stackoverflow всегда дает ответ.На этот раз совершенно неожиданным образом ...

Печатание приведенного выше вопроса заняло у меня пару минут, а затем я снова проверил свой блокнот Zeppelin.И вот, теперь я получил единственную запись, которую искал.

Объединение просто ужасно медленное, но оно работает - для получения результата понадобилось более 5 минут.Я никогда не ждал достаточно долго раньше.И нет, темы не очень большие, только несколько десятков тысяч сообщений.

Ну, теперь я знаю, что объединение в основном работает.Я должен выяснить, почему это так медленно, и как я могу ускорить его.

0 голосов
/ 15 апреля 2019

Я не знаком с Pyspark, но я мог видеть, что в Python оператор сравнения равен == вместо =, как в вашем коде.
Пожалуйста, проверьте еще раз, что делает оператор = внутри функции expr()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...