публикация сообщения из движка приложения в bigquery через pubsub и поток данных с использованием python - PullRequest
0 голосов
/ 03 февраля 2019

Я пытаюсь изучать продукты Google. В настоящее время я хочу выполнить очень простую задачу.Сценарий таков: я запускаю веб-приложение в движке приложений, используя флеш-фреймворк.Из формы я получаю ввод и хочу опубликовать сообщение для pubsub, затем из их я установил поток данных, который будет через слово (вход из HTML) большой запрос.Все это прекрасно работает, когда я использую консоль.Из консоли я опубликовал сообщение на pubsub, он переходит к bigquery, но когда я пробую код на python, он не работает.Вот обзор.

движок приложения -> pubsub -> поток данных -> большой запрос.

Вот код.Пожалуйста, скажите мне, что я делаю не так.Я получаю входное слово, но оно не переходит к большому запросу, и из консоли, когда я делаю это, оно работает нормально.

app.yaml

runtime: python
env: flex
entrypoint: gunicorn -b :$PORT main:app

runtime_config:
  python_version: 3
env_variables:
    PUBSUB_TOPIC: projects/data-pipeline-219304/topics/mytest
    PUBSUB_VERIFICATION_TOKEN: 1234test

manual_scaling:
  instances: 1
resources:
  cpu: 1
  memory_gb: 0.5
  disk_size_gb: 10

main.py

<code>import base64
import json
import logging
import os
import argparse

from flask import Flask, redirect, url_for, request, render_template, current_app
from google.cloud import pubsub_v1

app = Flask(__name__)


app.config['PUBSUB_VERIFICATION_TOKEN'] = \
    os.environ['PUBSUB_VERIFICATION_TOKEN']
app.config['PUBSUB_TOPIC'] = os.environ['PUBSUB_TOPIC']
app.config['PROJECT'] = os.environ['GOOGLE_CLOUD_PROJECT']


@app.route('/', methods=['GET', 'POST'])
def index():
    return render_template('index.html')


@app.route('/data', methods = ['POST'])
def handle_data():
    user = request.form.get('nm').encode('utf-8')
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(
        current_app.config['PROJECT'],
        current_app.config['PUBSUB_TOPIC'])
    publisher.publish(topic_path, data=user)

    receive_messages()
    return 'OK'

def receive_messages():
    """Receives messages from a pull subscription."""
    # [START pubsub_subscriber_async_pull]
    # [START pubsub_quickstart_subscriber]
    import time

    project_id = "data-pipeline-219304"
    subscription_name = "projects/data-pipeline-219304/subscriptions/mytestsubscription"

    subscriber = pubsub_v1.SubscriberClient()
    # The `subscription_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/subscriptions/{subscription_name}`
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking. We must keep the main thread from
    # exiting to allow it to process messages asynchronously in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)
    # [END pubsub_subscriber_async_pull]
    # [END pubsub_quickstart_subscriber]

@app.errorhandler(500)
def server_error(e):
    logging.exception('An error occurred during a request.')
    return """
    An internal error occurred: <pre>{}
См журналы для полной трассировки стека."" ".format (e), 500 if __name__ == '__main__': app.run (host = '127.0.0.1', port = 8080, debug = True)

needs.txt

Flask==1.0.2
google-cloud-pubsub==0.38.0
gunicorn==19.9.0

index.html из шаблонов

<!doctype html>
<html>
   <body>
      <form action="/data" method = "post">
         <p>Enter Name:</p>
         <p><input type = "text" name = "nm" /></p>
         <p><input type = "submit" value = "submit" /></p>
      </form>
   </body>
</html>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...