Я пытаюсь опубликовать потоковые данные в Твиттере в пабе / подразделе Google Cloud Platform. Версия Python: 2.7 Однако я могу получить потоковые данные, но они не публикуются в теме.
Этот код я использую для публикации данных в своей теме (publishtweet.py):
import datetime
import json
from google import pubsub_v1
PROJECT_NAME = "MY PROJECT ID"
PUBSUB_TOPIC_NAME = "MY TOPIC ID"
# Configure the connection
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_NAME, PUBSUB_TOPIC_NAME)
# Function to write data to
def write_to_pubsub(data):
try:
if data["lang"] == "en":
# publish to the topic, don't forget to encode everything at utf8!
publisher.publish(topic_path, data=json.dumps({
"text": data["text"],
"user_id": data["user_id"],
"id": data["id"],
"posted_at": datetime.datetime.fromtimestamp(data["created_at"]).strftime('%Y-%m-%d %H:%M:%S')
}).encode("utf-8"), tweet_id=str(data["id"]).encode("utf-8"))
except Exception as e:
print(e)
raise
Я использую следующий код для извлечения данных из Twitter с помощью Twitter API (tweet-streamer.py):
import tweepy
from tweepy import StreamListener
from publishtweet import write_to_pubsub
consumer_key = "APP_KEY"
consumer_secret = "APP_SECRET"
access_token = "##################"
access_token_secret = "##########"
# Authenticate
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# Configure to wait on rate limit if necessary
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=False)
# Hashtag list
lst_hashtags = ["#got", "#gameofthrones"]
# Listener class
class TweetListener(StreamListener):
def __init__(self):
super(StdOutListener, self).__init__()
def on_status(self, data):
# When receiveing a tweet: send it to pubsub
write_to_pubsub(reformat_tweet(data._json))
return True
def on_error(self, status):
if status == 420:
print("rate limit active")
return False
# Make an instance of the class
l = TweetListener()
# Start streaming
stream = tweepy.Stream(auth, l, tweet_mode='extended')
stream.filter(track=lst_hashtags)
Ошибка возникает при запуске tweet-streamer. py, он выбирает поток твита, заканчивающийся следующей ошибкой:
> No handlers could be found for logger
> "google.cloud.pubsub_v1.publisher._batch.thread"
У учетной записи службы, которую я использую, есть права администратора на pub / sub.