Задание потока данных Python не может быть отправлено - PullRequest
0 голосов
/ 13 октября 2019

У нас в GCP есть задание kubernetes cron, которое отправляет несколько копий одного и того же задания потока данных Python, каждое в своем собственном контейнере. Всякий раз, когда нам нужна новая копия задания, мы просто добавляем ее в spec-> jobTemplate-> spec-> template-> spec-> Containers часть yaml задания cron и настраиваем параметры задания потока данных по мере необходимости. Обычно это работает нормально, но последняя копия, которую мы пытались добавить, не работает. Существующие копии все еще работают как положено. Кажется, что задание не выполняется при отправке в GCP, и сообщение об ошибке не очень полезно:

Traceback (most recent call last):
  File "/app/job.py", line 117, in <module>
    newness.pipeline.run_dataflow(sys.argv)
  File "/app/newness/pipeline.py", line 480, in run_dataflow
    result = pipe.run()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 403, in run
    self.to_runner_api(), self.runner, self._options).run(False)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 416, in run
    return self.runner.run_pipeline(self)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 389, in run_pipeline
    self.dataflow_client.create_job(self.job), self)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 184, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 504, in create_job
    return self.submit_job_description(job)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 184, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 551, in submit_job_description
    response = self._client.projects_locations_jobs.Create(request)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py", line 578, in Create
    config, request, global_params=global_params)
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
    http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpError: <exception str() failed>

Задание вообще не отображается в консоли потока данных.

Предыдущие строкижурналы контейнеров выглядят так:

2019-10-13T03:57:47.725542287Z Successfully downloaded apache-beam

2019-10-13T03:58:17.125601280Z INFO:root:Staging SDK sources from PyPI to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar

2019-10-13T03:58:17.324843623Z INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar...

2019-10-13T03:58:24.825657227Z INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar

2019-10-13T03:58:25.225646529Z INFO:root:Downloading binary distribtution of the SDK from PyPi

2019-10-13T03:58:25.225716554Z INFO:root:Executing command: ['/usr/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpk5TfMS', 'apache-beam==2.8.0', '--no-deps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']

2019-10-13T03:59:33.926186906Z Collecting apache-beam==2.8.0

2019-10-13T03:59:52.125678183Z   Using cached https://files.pythonhosted.org/packages/0f/63/ea5453ba656d060936acf41d2ec057f23aafd69649e2129ac66fdda67d48/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl

2019-10-13T04:00:11.525435891Z   Saved /tmp/tmpk5TfMS/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl

2019-10-13T04:00:12.025054706Z Successfully downloaded apache-beam

2019-10-13T04:00:26.726190542Z INFO:root:Staging binary distribution of the SDK from PyPI to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl

2019-10-13T04:00:26.825618945Z INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl...

2019-10-13T04:00:33.725522899Z INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl

2019-10-13T04:06:14.525017097Z Traceback (most recent call last):
...

Почему это задание не может быть отправлено? Есть ли другие журналы, на которые мы можем посмотреть, чтобы увидеть причину этого сбоя?

(Большинство наших заданий потока данных написаны на Java, где сообщения об ошибках обычно более полезны.)

ОБНОВЛЕНИЕ: локальное выполнение задания (Windows) с помощью apache-beam 2.16 имеет ту же проблему, но с дополнительными подробностями ведения журнала:

...
INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1571606418.971000/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1571606418.971000/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl in 3 seconds.
WARNING:root:Discarding unparseable args: ['job.py', '--days_history=30']
WARNING:root:Discarding unparseable args: ['job.py', '--days_history=30']
WARNING:root:Retry with exponential backoff: waiting for 2.64795143823 seconds before retrying submit_job_description because we caught exception: error: [Errno 10053] An established connection was aborted by the software in your host machine
 Traceback for above exception (most recent call last):
  File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
    return fun(*args, **kwargs)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 593, in submit_job_description
    response = self._client.projects_locations_jobs.Create(request)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\clients\dataflow\dataflow_v1b3_client.py", line 657, in Create
    config, request, global_params=global_params)
  File "C:\Python27\lib\site-packages\apitools\base\py\base_api.py", line 729, in _RunMethod
    http, http_request, **opts)
  File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 346, in MakeRequest
    check_response_func=check_response_func)
  File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 396, in _MakeRequestNoRetry
    redirections=redirections, connection_type=connection_type)
  File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
    redirections, connection_type)
  File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
    redirections, connection_type)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1694, in request
    (response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1434, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1390, in _conn_request
    response = conn.getresponse()
  File "C:\Python27\lib\httplib.py", line 1121, in getresponse
    response.begin()
  File "C:\Python27\lib\httplib.py", line 438, in begin
    version, status, reason = self._read_status()
  File "C:\Python27\lib\httplib.py", line 394, in _read_status
    line = self.fp.readline(_MAXLINE + 1)
  File "C:\Python27\lib\socket.py", line 480, in readline
    data = self._sock.recv(self._rbufsize)
  File "C:\Python27\lib\ssl.py", line 754, in recv
    return self.read(buflen)
  File "C:\Python27\lib\ssl.py", line 641, in read
    v = self._sslobj.read(len)

... retries 4 times total ...

Traceback (most recent call last):
  File "job.py", line 117, in <module>
    newness.pipeline.run_dataflow(sys.argv)
  File "C:\Users\LeeW\Desktop\newness\newness\pipeline.py", line 480, in run_dataflow
    result = pipe.run()
  File "C:\Python27\lib\site-packages\apache_beam\pipeline.py", line 407, in run
    self._options).run(False)
  File "C:\Python27\lib\site-packages\apache_beam\pipeline.py", line 420, in run
    return self.runner.run_pipeline(self, self._options)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 485, in run_pipeline
    self.dataflow_client.create_job(self.job), self)
  File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
    return fun(*args, **kwargs)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 546, in create_job
    return self.submit_job_description(job)
  File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 219, in wrapper
    raise_with_traceback(exn, exn_traceback)
  File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
    return fun(*args, **kwargs)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 593, in submit_job_description
    response = self._client.projects_locations_jobs.Create(request)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\clients\dataflow\dataflow_v1b3_client.py", line 657, in Create
    config, request, global_params=global_params)
  File "C:\Python27\lib\site-packages\apitools\base\py\base_api.py", line 729, in _RunMethod
    http, http_request, **opts)
  File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 346, in MakeRequest
    check_response_func=check_response_func)
  File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 396, in _MakeRequestNoRetry
    redirections=redirections, connection_type=connection_type)
  File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
    redirections, connection_type)
  File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
    redirections, connection_type)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1694, in request
    (response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1434, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1390, in _conn_request
    response = conn.getresponse()
  File "C:\Python27\lib\httplib.py", line 1121, in getresponse
    response.begin()
  File "C:\Python27\lib\httplib.py", line 438, in begin
    version, status, reason = self._read_status()
  File "C:\Python27\lib\httplib.py", line 394, in _read_status
    line = self.fp.readline(_MAXLINE + 1)
  File "C:\Python27\lib\socket.py", line 480, in readline
    data = self._sock.recv(self._rbufsize)
  File "C:\Python27\lib\ssl.py", line 754, in recv
    return self.read(buflen)
  File "C:\Python27\lib\ssl.py", line 641, in read
    v = self._sslobj.read(len)
socket.error: [Errno 10053] An established connection was aborted by the software in your host machine

1 Ответ

0 голосов
/ 16 октября 2019

Какую версию Beam Python SDK вы используете?

...