Я пытаюсь создать анализ больших данных, используя Kafka, Python и Twitter. У меня есть поток данных твитов, которые я беру только из хештега. Моя проблема связана с продюсером Kafka для использования в Python. Я не могу отправить нужные данные в созданную мной тему, потому что не вижу никакой возможности отправить содержимое переменной производителю.
В https://kafka -python.readthedocs.io / en / master / использовании.html я вижу только возможность отправить точную строку с b'some_string'
. Но я хочу отправить хэштег, который я взял из Твиттера. Я не знаю много о Python, поэтому извините, если решение очевидно.
Импорт:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
import kafka
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
Потоковый контекст:
ssc = StreamingContext(sc,60)
Ключи:
consumer_key="consumer_key"
consumer_secret="consumer_secret"
access_token="access_token"
access_token_secret="access_token_secret"
Tweepy:
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
Производитель:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
Код:
class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
for hashtag in status.entities['hashtags']:
prueba = b'hashtag["text"]'
producer.send('topic', prueba)
return True
def on_error(self, status_code):
if status_code == 420:
#returning False in on_data disconnects the stream
return False
StreamListener:
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=MyStreamListener())
Tweet Stream:
myStream.filter(track=['some_text'])
Дело в том, что производитель отправляет только буквальную строку prueba
, то есть "(hashtag["text"])"
. Я хочу отправить не точную вещь, а ее содержание.
Заранее спасибо.