Использование FireStore в Google Dataflow - PullRequest
0 голосов
/ 13 декабря 2018

Я хочу использовать FireStore в шаблоне потока данных с python.

Я сделал что-то вроде этого:

with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)

Это подходящий способ его использования?


Дополнительная информация

def firestore_update_multiple(row):
    from google.cloud import firestore
    db = firestore.Client()
    doc_ref = db.collection(u'data').document(u'one')

    doc_ref.update({
        u'arrayExample': u'DataflowRunner',
        u'booleanExample': True
    })

1 Ответ

0 голосов
/ 15 декабря 2018

Общая идея верна, но вы должны реже распределять соединения с пожарным депо и группировать свои звонки.Вот пример ParDo, который должен делать это:

class FirestoreUpdateDoFn(beam.DoFn):

  def __init__(self, max_batch_size=500):
    self.element_batch = []
    self.max_batch_size = max_batch_size

  def start_bundle(self):
    self.db = firestore.Client()
    self.batch = db.batch()
    self.some_ref = db.collection(...)

  def process(self, row):
    self.element_batch.append(row)
    if len(self.element_batch) >= self.max_batch_size:
      self._flush_updates()

  def finish_bundle(self):
    self._flush_updates()
    self.db.close()

  def _flush_updates(self):
    for elm in self.element_batch:
      self.batch.update(...)
    batch.commit()

Это должно позволить вам делать меньше обратных вызовов в Firestore и ускорять конвейер.Затем вы должны сделать что-то вроде этого:

with beam.Pipeline(options=options) as p:
    (p
     | 'Read from PubSub' >> beam.io.ReadFromPubSub(sub)
                              .with_output_types(bytes)
     | 'String to dictionary' >> beam.ParDo(FirestoreUpdateDoFn())
    )

Проверить:

...