Оценщики (модели TensorFlow) используются во всех документах и учебных пособиях для TFX, таких как Расширенная документация TensorFlow (TFX)
Однако что если кому-то нужны только выходные данные TFX, использующие Beam для предварительной обработки данных?
Я использовал исключительно функцию Scikit-Learn pipeline
для построения конвейеров, поэтому во время прогнозирования через Flask или FastAPI можно просто запустить конвейер и перенести вывод в модель. , что исключает большую работу по предварительной обработке данных.
Однако, по моему опыту, только Beam предлагает масштабируемую DAG для предварительной обработки данных, а TFX добавляет слой постоянства, используя функцию AnalyzeAndTransform
, поэтому он имитирует Scikit- Изучите функцию подгонки без необходимости читать базу данных и пересчитывать такие вещи, как среднее и многократное.
Я пытался перехватить transformer_fn и повторно применить этот объект к пучкам конвейеров, и мне интересно, если это такое хорошая практика?
Например:
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transform_fn = (raw_data, raw_data_metadata) | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = ((raw_data, raw_data_metadata), transform_fn) | tft_beam.TransformDataset()
One c и возьмите transform_fn
и выберите его, например:
previous_transform = pickle.dump(transform_fn, open("save.pkl", "wb"))
Затем вы можете загрузить его в другой конвейер:
previous_transform = pickle.load(open("save.pkl", "rb"))
Затем вы можете выполнить тот же конвейер в другом месте, чтобы получить ваши данные, например, во время прогнозирования:
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transformed_data = ((raw_data, raw_data_metadata), previous_transform) | tft_beam.TransformDataset()
Или вы можете WriteTransformFn
с Apache Flink в распределенном режиме и загрузить TransformerFn, когда вы закончите, и ReadTransformerFn
в режиме локального бегуна на Flask или FastAPI.
Теперь вы можете выполнять прогнозирование через время прогнозирования, получая необработанные данные из transformed_data. И кажется, что с tft_beam
выполнение конвейера является выдающимся без запуска конвейера с pipeline.run().wait_until_finish()
.
Так что мне интересно, является ли это хорошей практикой? Или это хакерский способ?