Для приложения My Spark 2.4.x (pyspark) требуется:
- Входные данные - это две темы Kafka, а выходные данные - тема Kafka
- «Таблица потоковой передачи», где
- есть логический ключ (ы), а
- оставшиеся столбцы должны быть последними значениями из любого потока (ов).
- Секундная задержка.Испытания показывают, что это достижимо, когда
watermarks
не используются.
Это похоже на основную вещь, но она не работает для меня полностью.
Пример:
ПРИМЕЧАНИЕ : В приведенном ниже примере моменты времени T1, T2 и T2 могут быть разнесены на секунды / минуты / часы.
T1) Во время T1
KafkaPriceTopic получает 1 полезную нагрузку сообщения (назовем ее P1 ):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"1", "PriceCol": "101.5"}
KafkaVolumeTopic 1 сообщение с полезной нагрузкой (назовем его V1 ):
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"1", "VolumeCol": "50"}
Я хотел бы получить Результат DataFrame
, который будет выглядеть так:
+-----------+--------+---------+-------------+--------------+
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|
+-----------+--------+---------+-------------+--------------+
|Sec1 |101.5 |50 |1 |1 |
+-----------+--------+---------+-------------+--------------+
T2) KafkaPriceTopic 1 сообщение ( P2 ):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"2", "PriceCol": "101.6"}
Результат DataFrame
+-----------+--------+---------+-------------+--------------+
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|
+-----------+--------+---------+-------------+--------------+
|Sec1 |101.6 |50 |2 |1 |
+-----------+--------+---------+-------------+--------------+
ПРИМЕЧАНИЕ : P1 больше не актуально
T3) KafkaVolumeTopic 1 сообщение V2 :
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"2", "VolumeCol": "60"}
Результат DataFrame
+-----------+--------+---------+-------------+--------------+
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|
+-----------+--------+---------+-------------+--------------+
|Sec1 |101.6 |60 |2 |2 |
+-----------+--------+---------+-------------+--------------+
ПРИМЕЧАНИЕ : P1 & V1 не имеет значениябольше
Что работает
- извлечение JSON из полезной нагрузки (
get_json_object
на данный момент), join
потоков двух тем. - Однако.это даст (без
watermark
) DataFrame
, в котором будут указаны все цена и объем, полученные за Sec1 , а не только последние из них. - Итак, за этим следует
groupBy(...).agg(last(...),...)
.Но я застрял на получении только одной строки с самым последним значением.
dfKafka1 = spark.readStream.format("kafka"). #remaining options etc
.load()
.select(...) #pulls out fields as columns"
dfKafka2 = spark.readStream.format("kafka"). #remaining options etc
.load()
.select(...) #pulls out fields as columns"
dfResult=dfKafka1.join(dfKafka2,"SecurityCol")
#structured streaming doesnt yet allow groupBy after a join, so write to intermediate kafka topic
dfResult.writestream.format("kafka"). #remaining options
.trigger(processingTime="1 second")
.start()
#load intermediate kafka topic
dfKafkaResult=spark.readStream.format("kafka"). #remaining options
.load()
.select(...) #get_json_object for cols
.groupBy("SecurityCol") #define the "key" to agg cols
.agg(last("PriceCol"), #most recent value per col
last("PriceSeqNoCol"),
last("VolumeCol"),
last("VolumeSeqNoCol"))
Задача
Однако окончательный agg
& last()
не делаеттрюк последовательно.
- Когда KafkaVolumeTopic получает новое сообщение, результат может иметь соединение со старым сообщением от KafkaPriceTopic.
- Далее
orderBy
/ sort нельзя использовать в потоке без
Ограничения
- Я не могу
groupBy
до join
, так как это потребует withWatermark
, и я думаю, что мое приложение не может использоватьwatermark
.Обоснование: - Приложение должно иметь возможность присоединиться к двум темам для данного SecurityCol в любое время в течение дня.
- Если PriceTopic получит сообщение в 9:00, а VolumeTopic - в 10:00
- Я ожидаю, что эти два будут объединены и представлены
- Водяной знак ограничивается, когдаданные передаются в режиме
append
.Так что здесь нельзя использовать водяной знак, так как таймфрейм - целый день.
Есть идеи?