Я использую Apache Beam для получения журнала от Pub / Sub, который содержит информацию о трафике просмотра страниц.Каждая страница содержит уникальный идентификатор, и когда один журнал трафика просмотра страниц поступает из Pub / Sub, Cloud Dataflow будет собирать их в постоянном оконном режиме и считать их.В конце объединителя мы получим что-то вроде этого:
12345, 2
12456, 1
15213, 1
...
Как я знаю, ParDo - это преобразование Beam для общей параллельной обработки.После объединения я хочу реализовать преобразование, которое записывает запрос в Cloud Firestore, чтобы получить существующий идентификатор просмотра страницы, взять текущий счетчик просмотров, выполнить добавление к нему и выполнить операцию записи для обновления счетчика просмотров один за другим из комбинированного вывода, как показановыше.Любое предложение?
Ниже мой код пока для UpdateViewCount.Когда я получаю запрос, кажется невозможным иметь цикл for, чтобы получить запрос (это будет только одна строка запроса, поскольку просмотр страницы уникален, хотя)
class UpdateIntoFireStore(beam.DoFn):
def process(self, element):
listingid, count = element
doc_ref = db.collection('listings').where('listingid', u'==', '12345')
try:
docs = doc_ref.get()
for doc in docs:
print doc
except NotFound:
print(u'No such document!')