Как создать фрейм данных, объединяющий два потока kafka со столбцом «key (s)» и последними значениями оставшихся столбцов - PullRequest
2 голосов
/ 04 апреля 2019

Для приложения My Spark 2.4.x (pyspark) требуется:

  1. Входные данные - это две темы Kafka, а выходные данные - тема Kafka
  2. «Таблица потоковой передачи», где
    • есть логический ключ (ы), а
    • оставшиеся столбцы должны быть последними значениями из любого потока (ов).
  3. Секундная задержка.Испытания показывают, что это достижимо, когда watermarks не используются.

Это похоже на основную вещь, но она не работает для меня полностью.


Пример:

ПРИМЕЧАНИЕ : В приведенном ниже примере моменты времени T1, T2 и T2 могут быть разнесены на секунды / минуты / часы.

T1) Во время T1

KafkaPriceTopic получает 1 полезную нагрузку сообщения (назовем ее P1 ):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"1", "PriceCol": "101.5"}

KafkaVolumeTopic 1 сообщение с полезной нагрузкой (назовем его V1 ):
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"1", "VolumeCol": "50"}

Я хотел бы получить Результат DataFrame, который будет выглядеть так:

+-----------+--------+---------+-------------+--------------+ 
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|  
+-----------+--------+---------+-------------+--------------+ 
|Sec1       |101.5   |50       |1            |1             |
+-----------+--------+---------+-------------+--------------+ 

T2) KafkaPriceTopic 1 сообщение ( P2 ):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"2", "PriceCol": "101.6"}

Результат DataFrame

+-----------+--------+---------+-------------+--------------+ 
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|  
+-----------+--------+---------+-------------+--------------+ 
|Sec1       |101.6   |50       |2            |1             |
+-----------+--------+---------+-------------+--------------+ 

ПРИМЕЧАНИЕ : P1 больше не актуально

T3) KafkaVolumeTopic 1 сообщение V2 :
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"2", "VolumeCol": "60"}

Результат DataFrame

+-----------+--------+---------+-------------+--------------+ 
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|
+-----------+--------+---------+-------------+--------------+
|Sec1       |101.6   |60       |2            |2             |
+-----------+--------+---------+-------------+--------------+ 

ПРИМЕЧАНИЕ : P1 & V1 не имеет значениябольше


Что работает

  1. извлечение JSON из полезной нагрузки (get_json_object на данный момент), join потоков двух тем.
  2. Однако.это даст (без watermark) DataFrame, в котором будут указаны все цена и объем, полученные за Sec1 , а не только последние из них.
  3. Итак, за этим следует groupBy(...).agg(last(...),...).Но я застрял на получении только одной строки с самым последним значением.
    dfKafka1 = spark.readStream.format("kafka"). #remaining options etc
                .load()
                .select(...)                     #pulls out fields as columns" 

    dfKafka2 = spark.readStream.format("kafka"). #remaining options etc
               .load()
               .select(...)                     #pulls out fields as columns" 

    dfResult=dfKafka1.join(dfKafka2,"SecurityCol")

#structured streaming doesnt yet allow groupBy after a join, so write to intermediate kafka topic

   dfResult.writestream.format("kafka"). #remaining options
           .trigger(processingTime="1 second")
           .start()

#load intermediate kafka topic
   dfKafkaResult=spark.readStream.format("kafka"). #remaining options
                 .load()
                 .select(...)                      #get_json_object for cols
                 .groupBy("SecurityCol")           #define the "key" to agg cols
                 .agg(last("PriceCol"),            #most recent value per col
                      last("PriceSeqNoCol"),
                      last("VolumeCol"),
                      last("VolumeSeqNoCol"))


Задача

Однако окончательный agg & last() не делаеттрюк последовательно.

  1. Когда KafkaVolumeTopic получает новое сообщение, результат может иметь соединение со старым сообщением от KafkaPriceTopic.
  2. Далее orderBy / sort нельзя использовать в потоке без

Ограничения

  1. Я не могу groupBy до join, так как это потребует withWatermark, и я думаю, что мое приложение не может использоватьwatermark.Обоснование:
    • Приложение должно иметь возможность присоединиться к двум темам для данного SecurityCol в любое время в течение дня.
      • Если PriceTopic получит сообщение в 9:00, а VolumeTopic - в 10:00
      • Я ожидаю, что эти два будут объединены и представлены
    • Водяной знак ограничивается, когдаданные передаются в режиме append.Так что здесь нельзя использовать водяной знак, так как таймфрейм - целый день.

Есть идеи?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...