Как получить выходные данные конвейера и выполнить чтение и запись в Cloud Firestore - PullRequest
0 голосов
/ 25 сентября 2018

Я использую Apache Beam для получения журнала от Pub / Sub, который содержит информацию о трафике просмотра страниц.Каждая страница содержит уникальный идентификатор, и когда один журнал трафика просмотра страниц поступает из Pub / Sub, Cloud Dataflow будет собирать их в постоянном оконном режиме и считать их.В конце объединителя мы получим что-то вроде этого:

12345, 2
12456, 1
15213, 1
...

Как я знаю, ParDo - это преобразование Beam для общей параллельной обработки.После объединения я хочу реализовать преобразование, которое записывает запрос в Cloud Firestore, чтобы получить существующий идентификатор просмотра страницы, взять текущий счетчик просмотров, выполнить добавление к нему и выполнить операцию записи для обновления счетчика просмотров один за другим из комбинированного вывода, как показановыше.Любое предложение?

Ниже мой код пока для UpdateViewCount.Когда я получаю запрос, кажется невозможным иметь цикл for, чтобы получить запрос (это будет только одна строка запроса, поскольку просмотр страницы уникален, хотя)

class UpdateIntoFireStore(beam.DoFn):
    def process(self, element):
        listingid, count = element
        doc_ref = db.collection('listings').where('listingid', u'==', '12345')
        try:
            docs = doc_ref.get()
            for doc in docs:
                print doc
        except NotFound:
            print(u'No such document!')

1 Ответ

0 голосов
/ 26 сентября 2018

Я решил это.Нет необходимости помещать цикл для извлечения данных, и я должен получить конкретный идентификатор с именем документа.

doc_ref = db.collection(u'listings').document(listingid)
try:
    doc = doc_ref.get()
    doc_dict = doc.to_dict()
    self.cur_count = doc_dict[u'count']
    doc_ref.update({
        u'count': self.cur_count + count
    })
except NotFound:
    doc_ref.set({'count': count})
...