Скачивание файла в DoFn - PullRequest
       29

Скачивание файла в DoFn

0 голосов
/ 29 октября 2019

Неясно, безопасно ли загружать файлы в DoFn.

Мой DoFn загрузит файл ~ 20 МБ (модель ML) для применения к элементам в моем конвейере. Согласно документации Beam, требования включают сериализуемость и совместимость с потоками.

Пример ( 1 , 2 ) очень похож на мойDoFn. Он демонстрирует загрузку из хранилища GCP (как я это делаю с DataflowRunner), но я не уверен, что этот подход безопасен.

Если объекты загружаются в буфер байтов в памяти вместо загрузки на диск, или есть другой лучший метод для этого варианта использования? Я еще не сталкивался с наилучшим практическим подходом к этому шаблону.

Ответы [ 2 ]

1 голос
/ 31 октября 2019

Добавление к этому ответу.

Если данные вашей модели статичны, вы можете использовать приведенный ниже пример кода для передачи вашей модели в качестве бокового ввода.

#DoFn to open the model from GCS location
class get_model(beam.DoFn):
    def process(self, element):
        from apache_beam.io.gcp import gcsio
        logging.info('reading model from GCS')
        gcs = gcsio.GcsIO()
        yield gcs.open(element)


#Pipeline to load pickle file from GCS bucket
model_step = (p
              | 'start' >> beam.Create(['gs://somebucket/model'])
              | 'load_model' >> beam.ParDo(get_model())
              | 'unpickle_model' >> beam.Map(lambda bin: dill.load(bin)))

#DoFn to predict the results.
class predict(beam.DoFn):
    def process(self, element, model):
        (features, clients) = element
        result = model.predict_proba(features)[:, 1]
        return [(clients, result)]

#main pipeline to get input and predict results.
_ = (p
     | 'get_input' >> #get input based on source and preprocess it.
     | 'predict_sk_model' >> beam.ParDo(predict(), beam.pvalue.AsSingleton(model_step))
     | 'write' >> #write output based on target.

В случае потокового конвейера, если вы хотите загрузить модель снова после предопределенноговремя, вы можете проверить шаблон «Медленно изменяющийся поисковый кеш» здесь .

1 голос
/ 29 октября 2019

Если это модель scikit-learn, вы можете посмотреть на ее размещение в Cloud ML Engine и представить ее как конечную точку REST. Затем вы можете использовать что-то вроде BagState для оптимизации вызова моделей по сети. Более подробную информацию можно найти по этой ссылке https://beam.apache.org/blog/2017/08/28/timely-processing.html

...