Задание Apache Beam (Python) с использованием Tensorflow Transform уничтожается облачным потоком данных - PullRequest
0 голосов
/ 13 сентября 2018

Я пытаюсь запустить задание Apache Beam, основанное на преобразовании Tensorflow в потоке данных, но оно уничтожено.Кто-то испытывал такое поведение?Это простой пример с DirectRunner, который работает нормально на моем локальном компьютере, но не работает в потоке данных (я правильно изменяю бегун):

import os
import csv
import datetime
import numpy as np

import tensorflow as tf
import tensorflow_transform as tft

from apache_beam.io import textio
from apache_beam.io import tfrecordio

from tensorflow_transform.beam import impl as beam_impl
from tensorflow_transform.beam import tft_beam_io 
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema

import apache_beam as beam


NUMERIC_FEATURE_KEYS = ['feature_'+str(i) for i in range(2000)]


def _create_raw_metadata():
    column_schemas = {}
    for key in NUMERIC_FEATURE_KEYS:
        column_schemas[key] = dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())

    raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(column_schemas))

    return raw_data_metadata


def preprocessing_fn(inputs):
    outputs={}

    for key in NUMERIC_FEATURE_KEYS:
        outputs[key] = tft.scale_to_0_1(inputs[key])

    return outputs


def main():

    output_dir = '/tmp/tmp-folder-{}'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

    RUNNER = 'DirectRunner'

    with beam.Pipeline(RUNNER) as p:
        with beam_impl.Context(temp_dir=output_dir):

            raw_data_metadata = _create_raw_metadata()
            _ = (raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(os.path.join(output_dir, 'rawdata_metadata'), pipeline=p))

            m = numpy_dataset = np.random.rand(100,2000)*100
            raw_data = (p
                    | 'CreateTestDataset' >> beam.Create([dict(zip(NUMERIC_FEATURE_KEYS, m[i,:])) for i in range(m.shape[0])]))

            raw_dataset = (raw_data, raw_data_metadata)

            transform_fn = (raw_dataset | 'Analyze' >> beam_impl.AnalyzeDataset(preprocessing_fn))
            _ = (transform_fn | 'WriteTransformFn' >> tft_beam_io.WriteTransformFn(output_dir))

            (transformed_data, transformed_metadata) = ((raw_dataset, transform_fn) | 'Transform' >> beam_impl.TransformDataset())

            transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
            _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(os.path.join(output_dir, 'train'), file_name_suffix='.gz', coder=transformed_data_coder)

if __name__ == '__main__':
  main()

Кроме того, мой рабочий код (не показан) завершается с сообщением: The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.

Любой намек?

1 Ответ

0 голосов
/ 24 сентября 2018

Ограничение на размер описания конвейера задокументировано здесь: https://cloud.google.com/dataflow/quotas#limits

Существует способ обойти это, вместо того, чтобы создавать этапы для каждого тензора, который входит в tft.scale_to_0_1, мы могли бы объединить их, сначала сложивони вместе, а затем передают их в tft.scale_to_0_1 с помощью 'elementwise = True'.

Результат будет таким же, потому что min и max вычисляются для каждого столбца, а не для всего тензора.

Это будет выглядеть примерно так:

stacked = tf.stack([inputs[key] for key in NUMERIC_FEATURE_KEYS], axis=1)
scaled_stacked = tft.scale_to_0_1(stacked, elementwise=True)
for key, tensor in zip(NUMERIC_FEATURE_KEYS, tf.unstack(scaled_stacked, axis=1)):
  outputs[key] = tensor
...