Работа Pyspark не будет корректно завершена - PullRequest
0 голосов
/ 12 сентября 2018

Я создал потоковое задание spark (v2.2) в python (v3.6.4) и Yarn (v2.7.3), которое должно работать в течение 1 дня, а затем корректно завершить работу, запустить отдельный процесс и затем возобновить работу для еще 24 часа и т. д.

Мои данные находятся в теме Кафки, и мое управление смещением работает нормально, однако я могу уничтожить задание только с помощью менеджера пряжи, а некоторые данные не обрабатываются.

После долгих исследований я был вынужден создать ключ отключения в базе данных или файл сигналов в HDFS. И я создал класс Thread.timer для опроса базы данных и ожидания появления ключа выключения, значения, а затем изящно выполнил команду shutdown в контексте потоковой передачи искры: ssc.stop(True, True)

Класс таймера запускается в своем собственном процессе, в то время как запускается искра; Я вижу сообщение о завершении работы в журнале, но, к сожалению, работа с искрой никогда не останавливается вообще.

Мой код слишком велик для отправки, но вот суть

class FuncTimer(threading.Thread):
def __init__(self, interval, func, *args, **kwargs):
    threading.Thread.__init__(self)
    self.interval = interval
    self.func = func
    self.args = args
    self.kwargs = kwargs
    self.runnable = True

def run(self):
    while self.runnable:
        self.func(*self.args)
        time.sleep(self.interval)

def stop(self):
    self.runnable = False

   def shutdown_gracefully(*args):
       ssc, ops = args

conn = redis.StrictRedis(host=ops.value['redis_host'], port=ops.value['redis_port'], db=ops.value['redis_db'])

check_shutdown_status = conn.sismember(ops.value['shutdown_key'], ops.value['shutdown_value'])

if check_shutdown_status:
    print("Shutdown value found in Redis! Shutting down gracefully!")
    ssc.stop(True, True)


def main():

# start shutdown timer thread!
thread = FuncTimer(ops.value['shutdown_interval'], shutdown_gracefully, ssc, ops)
thread.start()
# start streaming from the current offset
kvs = KafkaUtils.createDirectStream(ssc, [ops.value['topic_name']], kafka_params, from_offsets)

ssc.start()
ssc.AwaitTermination()

Кто-нибудь знает, как изящно отключить эту версию свечи?

Я знаю, что вы не должны вызывать ssc.start и ssc.stop в одном потоке драйвера; но я звоню стоп на ssc из другого потока. Нужно ли какое-то возвращаемое значение из рабочего потока?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...