Как я могу обновить широковещательную переменную в потоке зажигания Python? - PullRequest
0 голосов
/ 02 февраля 2019

Мне нужно обновить широковещательную переменную с течением времени (скажем, через определенный интервал) в структурированной потоковой передаче Spark с использованием Python.Я знаю, что есть похожая версия вопроса, на который уже дан ответ ( здесь ), но все они находятся в Scala или Java.Мне нужно знать как написать класс Broadcast Wrapper на Python .Пытался конвертировать Java-версию, но она все еще не обновляется

class BroadcastWrapper:
  broadcastVar = None
  lastUpdatedAt = datetime.now()

  def updateAndGetRules(spark):
    currentDate = datetime.now()
    diffSec = (currentDate-BroadcastWrapper.lastUpdatedAt).total_seconds() # Difference in seconds

    if BroadcastWrapper.broadcastVar is None or diffSec > 120:
      if BroadcastWrapper.broadcastVar is not None:
        BroadcastWrapper.broadcastVar.unpersist()

      rulesDF = 'Read data from source here'

      BroadcastWrapper.broadcastVar = spark.sparkContext.broadcast(rulesDF.collect()) #I'm collecting it because I need to iterate through rules (filter) and apply that on streaming data
      BroadcastWrapper.lastUpdatedAt = datetime.now()

    return BroadcastWrapper.broadcastVar

Я получаю доступ к переменной вещания, как показано ниже -

for rule in BroadcastWrapper.updateAndGetRules(spark).value:

Дайте мне знать, если у вас есть альтернативное решение

...