Я пытаюсь использовать преобразование тензорного потока, и я хотел бы сериализовать весь конвейер, состоящий из различных преобразований. Допустим, у меня есть преобразование, которое не нужно встраивать (как взаимодействие элементов между цифрами c столбцов). Я хочу использовать функцию TransformDataset
непосредственно для функции предварительной обработки, которую я уже определил. В любом случае кажется, что это невозможно
Если запустить что-то вроде этого
import pprint
import tempfile
import apache_beam as beam
import pandas as pd
import tensorflow as tf
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata, schema_utils
NUMERIC_FEATURE_KEYS = ['a', 'b', 'c']
impute_dictionary = dict(b=1.0, c=0.0)
RAW_DATA_FEATURE_SPEC = dict([(name, tf.io.FixedLenFeature([], tf.float32)) for name in NUMERIC_FEATURE_KEYS])
RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC))
def interaction_fn(inputs):
outputs = inputs.copy()
new_numeric_feature_keys = []
for i in range(len(NUMERIC_FEATURE_KEYS)):
for j in range(i, len(NUMERIC_FEATURE_KEYS)):
if i == j:
outputs[f'{NUMERIC_FEATURE_KEYS[i]}_squared'] = outputs[NUMERIC_FEATURE_KEYS[i]] * outputs[NUMERIC_FEATURE_KEYS[i]]
new_numeric_feature_keys.append(f'{NUMERIC_FEATURE_KEYS[i]}_squared')
else:
outputs[f'{NUMERIC_FEATURE_KEYS[i]}_{NUMERIC_FEATURE_KEYS[j]}'] = outputs[NUMERIC_FEATURE_KEYS[i]] * outputs[ NUMERIC_FEATURE_KEYS[j]]
new_numeric_feature_keys.append(f'{NUMERIC_FEATURE_KEYS[i]}_{NUMERIC_FEATURE_KEYS[j]}')
NUMERIC_FEATURE_KEYS.extend(new_numeric_feature_keys)
return outputs
if __name__ == '__main__':
temp = tempfile.gettempdir()
data = pd.DataFrame(dict(
a=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
b=[1.0, 1.0, 1.0, 2.0, 0.0, 1.0],
c=[0.9, 2.0, 1.0, 0.0, 0.0, 0.0]
))
data.to_parquet('data_no_nans.parquet')
x = {}
for col in data.columns:
x[col] = tf.constant(data[col], dtype=tf.float32, name=col)
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet('data_no_nans.parquet')
raw_dataset = (raw_data, RAW_DATA_METADATA)
transformed_data, _ = (raw_data, interaction_fn) | tft_beam.TransformDataset()
transformed_data | beam.Map(pprint.pprint)
Я получаю ошибку
2020-02-11 15:49:37.025525: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2020-02-11 15:49:37.132944: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7f87ddda6d30 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-02-11 15:49:37.132959: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Host, Default Version
WARNING:tensorflow:Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended.
WARNING:tensorflow:Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended.
Traceback (most recent call last):
File "/Users/andrea.marchini/Hackathon/tfx_test/foo.py", line 56, in <module>
transformed_data, _ = (raw_data, interaction_fn) | tft_beam.TransformDataset()
File "/Users/andrea.marchini/.local/share/virtualenvs/tfx_test-jg7eSsGQ/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 482, in __ror__
pvalueish, pvalues = self._extract_input_pvalues(left)
File "/Users/andrea.marchini/.local/share/virtualenvs/tfx_test-jg7eSsGQ/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py", line 908, in _extract_input_pvalues
dataset_and_transform_fn)
TypeError: cannot unpack non-iterable PCollection object
Предполагается ли использовать TransformDataset
только на результат AnalyzeAndTransformDataset
один?