Я создал потоковое задание spark (v2.2) в python (v3.6.4) и Yarn (v2.7.3), которое должно работать в течение 1 дня, а затем корректно завершить работу, запустить отдельный процесс и затем возобновить работу для еще 24 часа и т. д.
Мои данные находятся в теме Кафки, и мое управление смещением работает нормально, однако я могу уничтожить задание только с помощью менеджера пряжи, а некоторые данные не обрабатываются.
После долгих исследований я был вынужден создать ключ отключения в базе данных или файл сигналов в HDFS. И я создал класс Thread.timer для опроса базы данных и ожидания появления ключа выключения, значения, а затем изящно выполнил команду shutdown в контексте потоковой передачи искры:
ssc.stop(True, True)
Класс таймера запускается в своем собственном процессе, в то время как запускается искра; Я вижу сообщение о завершении работы в журнале, но, к сожалению, работа с искрой никогда не останавливается вообще.
Мой код слишком велик для отправки, но вот суть
class FuncTimer(threading.Thread):
def __init__(self, interval, func, *args, **kwargs):
threading.Thread.__init__(self)
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self.runnable = True
def run(self):
while self.runnable:
self.func(*self.args)
time.sleep(self.interval)
def stop(self):
self.runnable = False
def shutdown_gracefully(*args):
ssc, ops = args
conn = redis.StrictRedis(host=ops.value['redis_host'], port=ops.value['redis_port'], db=ops.value['redis_db'])
check_shutdown_status = conn.sismember(ops.value['shutdown_key'], ops.value['shutdown_value'])
if check_shutdown_status:
print("Shutdown value found in Redis! Shutting down gracefully!")
ssc.stop(True, True)
def main():
# start shutdown timer thread!
thread = FuncTimer(ops.value['shutdown_interval'], shutdown_gracefully, ssc, ops)
thread.start()
# start streaming from the current offset
kvs = KafkaUtils.createDirectStream(ssc, [ops.value['topic_name']], kafka_params, from_offsets)
ssc.start()
ssc.AwaitTermination()
Кто-нибудь знает, как изящно отключить эту версию свечи?
Я знаю, что вы не должны вызывать ssc.start и ssc.stop в одном потоке драйвера; но я звоню стоп на ssc из другого потока. Нужно ли какое-то возвращаемое значение из рабочего потока?