Добавление к этому ответу.
Если данные вашей модели статичны, вы можете использовать приведенный ниже пример кода для передачи вашей модели в качестве бокового ввода.
#DoFn to open the model from GCS location
class get_model(beam.DoFn):
def process(self, element):
from apache_beam.io.gcp import gcsio
logging.info('reading model from GCS')
gcs = gcsio.GcsIO()
yield gcs.open(element)
#Pipeline to load pickle file from GCS bucket
model_step = (p
| 'start' >> beam.Create(['gs://somebucket/model'])
| 'load_model' >> beam.ParDo(get_model())
| 'unpickle_model' >> beam.Map(lambda bin: dill.load(bin)))
#DoFn to predict the results.
class predict(beam.DoFn):
def process(self, element, model):
(features, clients) = element
result = model.predict_proba(features)[:, 1]
return [(clients, result)]
#main pipeline to get input and predict results.
_ = (p
| 'get_input' >> #get input based on source and preprocess it.
| 'predict_sk_model' >> beam.ParDo(predict(), beam.pvalue.AsSingleton(model_step))
| 'write' >> #write output based on target.
В случае потокового конвейера, если вы хотите загрузить модель снова после предопределенноговремя, вы можете проверить шаблон «Медленно изменяющийся поисковый кеш» здесь .