Это мое окружение:
apache -beam == 2,16,0
TennSflow == 2,1,0
TensorFlow-метаданные == 0,15,2
TensorFlow-преобразование == 0,15 .0
Python 2.7.13
pip 20.0.2
Я запустил тензор-преобразование, загруженное в следующую папку
import site
print(site.USER_SITE)
/home/jupyter/.local/lib/python3.5/site-packages
Я могу запустить следующую предварительную обработку в Apache Луч локально успешно. Но когда я запускаю его в облаке (DataflowRunner), возникает ошибка.
def preprocess(in_test_mode):
import os
import os.path
import tempfile
from apache_beam.io import tfrecordio
from tensorflow_transform.coders import example_proto_coder
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.beam import tft_beam_io
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
job_name = 'preprocess-bike-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
if in_test_mode:
import shutil
print('Launching local job ... hang on')
OUTPUT_DIR = './bike_preproc_tft'
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
EVERY_N = 5
else:
print('Launching Dataflow job {} ... hang on'.format(job_name))
OUTPUT_DIR = 'gs://{0}/bike_preproc_tft/'.format(BUCKET)
import subprocess
subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
EVERY_N = 5
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
'job_name': job_name,
'project': PROJECT,
'max_num_workers': 6,
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True,
'requirements_file': 'requirements.txt'
}
opts = beam.pipeline.PipelineOptions(flags=[], **options)
if in_test_mode:
RUNNER = 'DirectRunner'
else:
RUNNER = 'DataflowRunner'
# set up raw data metadata
raw_data_schema = {
colname : dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.FixedColumnRepresentation())
for colname in 'start_year,start_month,start_day,start_station_id,hire_count'.split(',')
}
raw_data_schema.update({
'day_of_week' : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())
})
raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
# run Beam
with beam.Pipeline(RUNNER, options=opts) as p:
with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
# save the raw data metadata
raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
pipeline=p)
# read training data from bigquery and filter rows
raw_data = (p
| 'train_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query('train', EVERY_N), use_standard_sql=True))
| 'train_filter' >> beam.Filter(is_valid))
raw_dataset = (raw_data, raw_data_metadata)
# analyze and transform training data
transformed_dataset, transform_fn = (
raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
transformed_data, transformed_metadata = transformed_dataset
# save transformed training data to disk in efficient tfrecord format
transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
os.path.join(OUTPUT_DIR, 'train'),
file_name_suffix='.gz',
coder=example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema))
# read eval data from bigquery and filter rows
raw_test_data = (p
| 'eval_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query('valid', EVERY_N), use_standard_sql=True))
| 'eval_filter' >> beam.Filter(is_valid))
raw_test_dataset = (raw_test_data, raw_data_metadata)
# transform eval data
transformed_test_dataset = (
(raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
transformed_test_data, _ = transformed_test_dataset
# save transformed training data to disk in efficient tfrecord format
transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
os.path.join(OUTPUT_DIR, 'eval'),
file_name_suffix='.gz',
coder=example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema))
# save transformation function to disk for use at serving time
transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
os.path.join(OUTPUT_DIR, 'metadata'))
Ошибки появляются при запуске preprocess(in_test_mode=False)
---------------------------------------------------------------------------
CalledProcessError Traceback (most recent call last)
~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in check_output(*args, **kwargs)
82 try:
---> 83 out = subprocess.check_output(*args, **kwargs)
84 except OSError:
/usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs, **kwargs)
315 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
--> 316 **kwargs).stdout
317
/usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs, **kwargs)
397 raise CalledProcessError(retcode, process.args,
--> 398 output=stdout, stderr=stderr)
399 return CompletedProcess(process.args, retcode, stdout, stderr)
CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
<ipython-input-13-eac0bb8c8400> in <module>
131 os.path.join(OUTPUT_DIR, 'metadata'))
132
--> 133 preprocess(in_test_mode=False) # change to True to run locally
<ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode)
129 # save transformation function to disk for use at serving time
130 transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
--> 131 os.path.join(OUTPUT_DIR, 'metadata'))
132
133 preprocess(in_test_mode=False) # change to True to run locally
~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
425 def __exit__(self, exc_type, exc_val, exc_tb):
426 if not exc_type:
--> 427 self.run().wait_until_finish()
428
429 def visit(self, visitor):
~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
405 self.to_runner_api(use_fake_coders=True),
406 self.runner,
--> 407 self._options).run(False)
408
409 if self._options.view_as(TypeOptions).runtime_type_check:
~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
418 finally:
419 shutil.rmtree(tmpdir)
--> 420 return self.runner.run_pipeline(self, self._options)
421
422 def __enter__(self):
~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py in run_pipeline(self, pipeline, options)
483 # raise an exception.
484 result = DataflowPipelineResult(
--> 485 self.dataflow_client.create_job(self.job), self)
486
487 # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in wrapper(*args, **kwargs)
204 while True:
205 try:
--> 206 return fun(*args, **kwargs)
207 except Exception as exn: # pylint: disable=broad-except
208 if not retry_filter(exn):
~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in create_job(self, job)
529 def create_job(self, job):
530 """Creates job description. May stage and/or submit for remote execution."""
--> 531 self.create_job_description(job)
532
533 # Stage and submit the job when necessary
~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in create_job_description(self, job)
559
560 # Stage other resources for the SDK harness
--> 561 resources = self._stage_resources(job.options)
562
563 job.proto.environment = Environment(
~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in _stage_resources(self, options)
489 options,
490 temp_dir=tempfile.mkdtemp(),
--> 491 staging_location=google_cloud_options.staging_location)
492 return resources
493
~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py in stage_job_resources(self, options, build_setup_args, temp_dir, populate_requirements_cache, staging_location)
166 (populate_requirements_cache if populate_requirements_cache else
167 Stager._populate_requirements_cache)(setup_options.requirements_file,
--> 168 requirements_cache_path)
169 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
170 self.stage_artifact(
~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in wrapper(*args, **kwargs)
204 while True:
205 try:
--> 206 return fun(*args, **kwargs)
207 except Exception as exn: # pylint: disable=broad-except
208 if not retry_filter(exn):
~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py in _populate_requirements_cache(requirements_file, cache_dir)
485 ]
486 logging.info('Executing command: %s', cmd_args)
--> 487 processes.check_output(cmd_args, stderr=processes.STDOUT)
488
489 @staticmethod
~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in check_output(*args, **kwargs)
89 "Full traceback: {} \n Pip install failed for package: {} \
90 \n Output from execution of subprocess: {}" \
---> 91 .format(traceback.format_exc(), args[0][6], error.output))
92 else:
93 raise RuntimeError("Full trace: {}, \
RuntimeError: Full traceback: Traceback (most recent call last):
File "/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py", line 83, in check_output
out = subprocess.check_output(*args, **kwargs)
File "/usr/lib/python3.5/subprocess.py", line 316, in check_output
**kwargs).stdout
File "/usr/lib/python3.5/subprocess.py", line 398, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1
Pip install failed for package: -r
Output from execution of subprocess: b"Collecting tensorflow-transform==0.15.0\n Using cached tensorflow-transform-0.15.0.tar.gz (222 kB)\n Saved /tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting absl-py<0.9,>=0.7\n Using cached absl-py-0.8.1.tar.gz (103 kB)\n Saved /tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting apache-beam[gcp]<3,>=2.16\n Using cached apache-beam-2.19.0.zip (1.9 MB)\n Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting numpy<2,>=1.16\n Using cached numpy-1.18.1.zip (5.4 MB)\n Installing build dependencies: started\n Installing build dependencies: still running...\n Installing build dependencies: finished with status 'done'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status 'done'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status 'done'\n Saved /tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting protobuf<4,>=3.7\n Using cached protobuf-3.11.3.tar.gz (264 kB)\n Saved /tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting pydot<2,>=1.2\n Using cached pydot-1.4.1.tar.gz (128 kB)\n Saved /tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting six<2,>=1.10\n Using cached six-1.14.0.tar.gz (33 kB)\n Saved /tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r requirements.txt (line 1)) (from versions: 0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r requirements.txt (line 1))\n"
Я думаю, что из-за этой ошибки метаданные папка не может быть создана. Это приводит к следующей ошибке во время выполнения.
Читая другие сайты, я подозреваю, что команда pip download завершилась неудачно. Я уже обновил пункт, поэтому я не знаю, что еще я могу сделать.
Пожалуйста, дайте мне несколько советов.