У меня есть приложение, где пользователи могут голосовать за что-то.
Я хочу, чтобы мое приложение масштабировалось, поэтому я решил объединить счетчики, хранящиеся в Firestore, используя Cloud Dataflow.
Я настроил задание Dataflow типа streaming , чтобы оно могло прослушивать темы pubsub всякий раз, когда пользователь голосует за что-либо.
Иногда у меня есть тысячи пользовательских входов в день, иногда я получаю несколько сотен ... Есть ли какое-нибудь решение, чтобы "приостановить" работу, когда она некоторое время не получает сообщения pubsub?
В настоящее время моя работа с потоками данных всегда выполняется, и я боюсь, что это будет стоить мне много денег.
Если кто-то может помочь мне разобраться в биллинге с потоковой работой, мы будем признательны
Вот мой конвейер Python:
def run(argv=None):
# Config
parser = argparse.ArgumentParser()
# Output PubSub Topic
parser.add_argument(
'--output_topic', required=True)
# Input PubSub Topic
parser.add_argument(
'--input_topic', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
# Pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
# Pipeline process
with beam.Pipeline(options=pipeline_options) as p:
# Counting votes
def count_votes(contestant_votes):
(contestant, votes) = contestant_votes
return (contestant, sum(votes))
# Format data to a fake object (used to be parsed by the CF)
def format_result(contestant_votes):
(contestant, votes) = contestant_votes
return '{ "contestant": %s, "votes": %d }' % (contestant, votes)
transformed = (p
| 'Receive PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
.with_output_types(bytes)
| 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'Pair with one' >> beam.Map(lambda x: (x, 1))
| 'Apply window of time' >> beam.WindowInto(window.FixedWindows(30, 0))
| 'Group by contestant' >> beam.GroupByKey()
| 'Count votes' >> beam.Map(count_votes)
| 'Format to fake object string' >> beam.Map(format_result)
| 'Transform to PubSub base64 string' >> beam.Map(lambda x: x.encode('utf-8'))
.with_output_types(bytes))
# Trigger a the output PubSub topic with the message payload
transformed | beam.io.WriteToPubSub(known_args.output_topic)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()