Я запустил модуль потока воздуха в среде Docker -desktop k8s на моих компьютерах Macos.
Одной задачей воздушного потока является загрузка gzip-файла большого размера ( 670MB ) с удаленного сервера sftp. Я использую Python paramiko для создания соединения sh, а затем его метод для загрузки необходимых файлов gzip.
На самом деле мои другие задачи загрузки воздушного потока (размер файла не превышает 30 МБ) могут быть успешно выполнены, и файл gzip может быть загружен полностью.
Ниже приведена ошибка воздушного потока log, в нем много Time since last heartbeat(0.06 s) < heartrate(5.0 s), sleeping for 4.938538 s
журналов предупреждений, которые действительно раздражают, и, вероятно, именно причина root вызвала ssh connection dropped error
.
Использование airflow list_dags -r
показывает ниже информацию об этом dag
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 1
Total task number: 42
DagBag parsing time: 0.217942
------------------------------------+----------+---------+----------+-------------------------------
file | duration | dag_num | task_num | dags
------------------------------------+----------+---------+----------+-------------------------------
/downloader-dags.py | 0.217942 | 1 | 42 | ['file-downloader']
------------------------------------+----------+---------+----------+-------------------------------
См. С «2020-01-10 03: 08: 33,280» до «2020-01-10 03: 29: 38,638» на проверку сердцебиения воздушного потока было потрачено около 22 минут, в течение этого времени файл был только скачал 8MB, часть samll всего файла ( 670MB ).
Версия воздушного потока: 1.10.6 Python: 3.7.6 Paramiko: 2.7.1 Kubenetes Версия: v1.14.6 Docker настольная версия: 2.1.0.3
Я имею просмотр этой статьи Воздушный поток - отключение журналов пульса . Я не могу изменить self.log.warning
на self.log.debug
, потому что владельцем изображения воздушного потока является не я.
Может кто-нибудь помочь решить эту проблему ? Любые идеи о сердцебиении воздушного потока будут признательны заранее.
[2020-01-10 03:08:27,906] {logging_mixin.py:112} INFO - [2020-01-10 03:08:27,906] {transport.py:1819} INFO - Connected (version 2.0, client Twisted)
[2020-01-10 03:08:28,182] {logging_mixin.py:112} INFO - [2020-01-10 03:08:28,182] {local_task_job.py:124} WARNING - Time since last heartbeat(0.07 s) < heartrate(5.0 s), sleeping for 4.928218 s
[2020-01-10 03:08:29,824] {logging_mixin.py:112} INFO - [2020-01-10 03:08:29,824] {transport.py:1819} INFO - Authentication (password) successful!
[2020-01-10 03:08:29,996] {logging_mixin.py:112} INFO - [2020-01-10 03:08:29,995] {sftp.py:158} INFO - [chan 0] Opened sftp connection (server version 3)
[2020-01-10 03:08:30,839] {logging_mixin.py:112} INFO - ftpclient.py sftp_get filename_fullpath: <class 'str'> /var/data/k8s/download/202001/daily_check.csv.gz
[2020-01-10 03:08:33,280] {logging_mixin.py:112} INFO - [2020-01-10 03:08:33,279] {local_task_job.py:124} WARNING - Time since last heartbeat(0.06 s) < heartrate(5.0 s), sleeping for 4.938538 s
......
[2020-01-10 03:29:38,638] {logging_mixin.py:112} INFO - [2020-01-10 03:29:38,637] {local_task_job.py:124} WARNING - Time since last heartbeat(0.07 s) < heartrate(5.0 s), sleeping for 4.927038 s
[2020-01-10 03:29:41,907] {logging_mixin.py:112} WARNING - 2020-01-10 03:29:41,906 - 10 - ERROR - 184 - sftp_get:sftp_get downloader.csv.gz error:sftp_get filename_fullpath /var/data/k8s/download/202001/daily_check.csv.gz error:<class 'paramiko.ssh_exception.SSHException'>:Server connection dropped:
[2020-01-10 03:29:41,918] {logging_mixin.py:112} INFO - [2020-01-10 03:29:41,906] {ftpclient.py:184} ERROR - sftp_get:sftp_get downloader.csv.gz error:sftp_get filename_fullpath /var/data/k8s/download/202001/daily_check.csv.gz error:<class 'paramiko.ssh_exception.SSHException'>:Server connection dropped:
[2020-01-10 03:29:41,921] {logging_mixin.py:112} WARNING - --- Logging error ---
[2020-01-10 03:29:41,911] {logging_mixin.py:112} WARNING - 2020-01-10 03:29:41,906 - 10 - ERROR - 184 - sftp_get:sftp_get downloader.csv.gz error:sftp_get filename_fullpath /var/data/k8s/download/202001/daily_check.csv.gz error:<class 'paramiko.ssh_exception.SSHException'>:Server connection dropped:
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/site-packages/paramiko/sftp_file.py", line 538, in _prefetch_thread
self, CMD_READ, self.handle, long(offset), int(length)
File "/usr/local/lib/python3.7/site-packages/paramiko/sftp_client.py", line 837, in _async_request
self._send_packet(t, msg)
File "/usr/local/lib/python3.7/site-packages/paramiko/sftp.py", line 198, in _send_packet
self._write_all(out)
File "/usr/local/lib/python3.7/site-packages/paramiko/sftp.py", line 164, in _write_all
raise EOFError()
EOFError