Воздушный поток неожиданно сигнализирует SIGTERM подпроцессам - PullRequest
0 голосов
/ 22 января 2019

Я использую PythonOperator для вызова функции, которая распараллеливает процесс обработки данных как задачу Airflow. Это делается просто путем обёртывания простой функции вызываемой функцией-оберткой, вызываемой Airflow.

def wrapper(ds, **kwargs):
    process_data()

process_data достигает распараллеливания, используя многопроцессорный модуль, который порождает подпроцессы. Когда я сам запускаю process_data из ноутбука jupyter, он без проблем запускается до конца. Однако, когда я запускаю его с помощью Airflow, задача не выполняется, и журнал задачи показывает что-то вроде этого.

[2019-01-22 17:16:46,966] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,969] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-129:

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:46,993] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,996] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-133:

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/queues.py", line 343, in get
    res = self._reader.recv_bytes()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/synchronize.py", line 99, in __exit__
    return self._semlock.__exit__(*args)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:47,086] {logging_mixin.py:95} INFO - file parsing and processing 256.07

[2019-01-22 17:17:12,938] {logging_mixin.py:95} INFO - combining and sorting 25.85

Я не совсем уверен, почему задание получает SIGTERM. Я предполагаю, что какой-то процесс более высокого уровня отправляет их в подпроцессы. Что я должен сделать, чтобы решить эту проблему?

Только что заметил, что ближе к концу журнала для задачи четко указано, что

airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 12:31:39,196] {models.py:1764} INFO - Marking task as FAILED.

1 Ответ

0 голосов
/ 19 апреля 2019

У меня тоже была похожая проблема, когда я запускал многопоточный код Python. Я смог решить то же самое, присоединившись к темам. Затем воздушный поток ожидает выполнения всех потоков перед отправкой SIGTERM.

threads = []   #array for threads

 t = Thread(...)
 threads.append(t) #add all threads

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
...