Я работаю со следующим кодом, используя тензор потока-преобразования:
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
from tensorflow_transform.beam import impl as beam_impl
from tensorflow_transform.coders import example_proto_coder
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
import tempfile
import ast
import six
import apache_beam as beam
# Beam Pipelines must receive a set of config options to set how it should run.
from apache_beam.options.pipeline_options import PipelineOptions
assert six.PY2
options = {
'runner': 'DirectRunner'
}
pipeline_options = PipelineOptions(**options)
RAW_DATA_SCHEMA = {
'customer_id': dataset_schema.ColumnSchema(tf.string, [], dataset_schema.ListColumnRepresentation())
}
RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(dataset_schema.Schema(RAW_DATA_SCHEMA))
def preprocess_fn(dictrow):
return {
'customer_id': tft.string_to_int(dictrow['customer_id'], vocab_filename='vocab_result')
}
working_dir = tempfile.mkdtemp(dir='/tmp/')
with beam.Pipeline(options=pipeline_options) as pipeline:
with beam_impl.Context(tempfile.mkdtemp()):
raw_data = (
pipeline
| 'create' >> beam.Create([
{'customer_id': ['customer_0']},
{'customer_id': ['customer1', 'customer2']},
{'customer_id': ['customer_0']}
])
)
raw_dataset = (raw_data, RAW_DATA_METADATA)
transformed_dataset, transform_fn = (
raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_fn))
transformed_data, transformed_metadata = transformed_dataset
OUTPUT_SCHEMA = {
'customer_id': dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.ListColumnRepresentation())
}
_ = transformed_data | 'writing' >> beam.io.tfrecordio.WriteToTFRecord(
working_dir + '/tf', coder=example_proto_coder.ExampleProtoCoder(dataset_schema.Schema(OUTPUT_SCHEMA)))
pipeline.run().wait_until_finish()
Но это дает мне ошибку:
tftrec / местные / Библиотека / python2.7 / сайт-пакеты / tensorflow_transform / ширина / impl.pyc
в процессе (самостоятельно, партия, сохраненная_модель_дир)
438 # Это должно сохраняться в течение всего срока действия этого DoFn, независимо от
439 # того, было ли кэшировано self._graph_state.
-> 440 заявить о себе.
441
442 выходов self._handle_batch (партия)
RuntimeError: AssertionError [во время работы
'AnalyzeAndTransformDataset / TransformDataset / Transform']
Интересно, что может быть не так. Я попытался напечатать self._graph_state.saved_model_dir
, и это действительно изменилось, но не уверен, почему это происходит.
Запуск tensorflow==1.13.1
, tensorflow-transform==0.13.0
, apache_beam[gcp]==2.11
на Python2
Тестирование для версии преобразования 0.8 кажется более стабильным, несмотря на то, что иногда выдает ошибку.
Как представляется, удаление окончательного процесса записи файлов tfrecord, по-видимому, делает код стабильным.