Запись данных в Google Firestore в конвейере - PullRequest
0 голосов
/ 04 октября 2018

Я хочу читать данные из Cloud BigQuery в Cloud Datastore через бегунок Dataflow с Apache Beam.Из документации Firestore еще не поддерживается.Я пишу свой собственный класс, чтобы сделать это.

class UpdateIntoFireStore(beam.DoFn):

    def process(self, element):
        try:
            cred = credentials.Certificate({
              "..."
            })

            firebase_admin.initialize_app(cred, {
            'projectId': '...',
            })
        except ValueError:
            pass
        db = firestore.client()
        doc_ref = db.collection(u'poi')
        doc_ref.add(element)

Конвейер выглядит следующим образом:

job = ( p  | 'Read from BigQuery' >> Read(BigQuerySource(query="SELECT * FROM ...", use_standard_sql=True))
           | 'Update to Firestore' >> beam.ParDo(UpdateIntoFireStore()))

Хорошо ли подходит этот подход?Я обеспокоен влиянием параллельной обработки на эти операции записи в Cloud Firestore.

Ответы [ 2 ]

0 голосов
/ 30 октября 2018

Используйте start_bundle для определения вашего клиента.

start_bundle - Вызывается до обработки пакета элементов на работнике.Элементы для обработки разбиваются на пакеты и распространяются среди рабочих.Прежде чем работник вызывает process () для первого элемента своего пакета, он вызывает этот метод.

Гораздо лучший подход:

class FirestoreDoFn(beam.DoFn):

def __init__(self):
    super(FirestoreDoFn, self).__init__()

def start_bundle(self):
    self.firestore_client = GoogleServices(
        crendential_path="<cred-path-in-here>"
    ).init_firestore(
        project_id="<your-project-id>",
        collection_id="<collection-id>"
    )

def process(self, element, *args, **kwargs):
    logging.info(element)
    # response = self.firestore_client.save()
    # logging.info("response: {}".format(response))
    return {"status":"ok"}
0 голосов
/ 05 октября 2018

Это похоже на выполнение внешних вызовов из потока данных.Технически это будет работать.Однако есть несколько вещей, о которых нужно знать.

  1. Нет гарантий, сколько раз будет обработан один элемент, поэтому вы можете получить несколько записей для одного и того же элемента в firestore.
  2. Вы будете делать отдельный вызов для каждого элемента в пожарное хранилище, и нет кэширования клиентов / подключений пожарного хранилища.
...