В случае, если вы используете Kubeflow, я бы предложил использовать конвейеры kubeflow.
Для предварительной обработки вы можете использовать образ, построенный поверх стандартного образа потока данных конвейера gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:latest
, где вы просто копируете код потока данных и запускаете его:
FROM gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:latest
RUN mkdir /{folder}
COPY run_dataflow_pipeline.py /{folder}
ENTRYPOINT ["python", "/{folder}/run_dataflow_pipeline.py"]
См. Этот шаблон для кода потока данных, который делает именно это. Идея заключается в том, что вы записываете записи TF в Google Cloud Storage (GCS).
Впоследствии вы можете использовать движок Google Cloud ML для фактического обучения. В этом случае вы также можете начать с образа google/cloud-sdk:latest
и скопировать необходимые файлы, вероятно, с помощью bash-скрипта, который будет запускаться для выполнения команд gcloud для запуска учебного задания.
FROM google/cloud-sdk:latest
RUN mkdir -p /{src} && \
cd /{src}
COPY train.sh ./
ENTRYPOINT ["bash", "./train.sh"]
Элегантный способ передать место хранения ваших записей TF в вашу модель - использовать TF.data:
# Construct a TFRecordDataset
train_records = [os.path.join('gs://{BUCKET_NAME}/', f.name) for f in
bucket.list_blobs(prefix='data/TFR/train')]
validation_records = [os.path.join('gs://{BUCKET_NAME}/', f.name) for f in
bucket.list_blobs(prefix='data/TFR/validation')]
ds_train = tf.data.TFRecordDataset(train_records, num_parallel_reads=4).map(decode)
ds_val = tf.data.TFRecordDataset(validation_records,num_parallel_reads=4).map(decode)
# potential additional steps for performance:
# https://www.tensorflow.org/guide/performance/datasets)
# Train the model
model.fit(ds_train,
validation_data=ds_val,
...,
verbose=2)
Ознакомьтесь с этим сообщением в блоге , чтобы узнать о реальной реализации аналогичного (более сложного) конвейера kubeflow