Проблема отправки данных с производителем кафки в Python (Jupyter Notebook) - PullRequest
0 голосов
/ 19 марта 2019

Я пытаюсь создать анализ больших данных, используя Kafka, Python и Twitter. У меня есть поток данных твитов, которые я беру только из хештега. Моя проблема связана с продюсером Kafka для использования в Python. Я не могу отправить нужные данные в созданную мной тему, потому что не вижу никакой возможности отправить содержимое переменной производителю.

В https://kafka -python.readthedocs.io / en / master / использовании.html я вижу только возможность отправить точную строку с b'some_string'. Но я хочу отправить хэштег, который я взял из Твиттера. Я не знаю много о Python, поэтому извините, если решение очевидно.

Импорт:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
import kafka
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

Потоковый контекст:

ssc = StreamingContext(sc,60)

Ключи:

consumer_key="consumer_key"
consumer_secret="consumer_secret"
access_token="access_token"
access_token_secret="access_token_secret"

Tweepy:

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

Производитель:

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

Код:

class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        for hashtag in status.entities['hashtags']:
            prueba = b'hashtag["text"]'
            producer.send('topic', prueba)
            return True
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

StreamListener:

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=MyStreamListener())

Tweet Stream:

myStream.filter(track=['some_text'])

Дело в том, что производитель отправляет только буквальную строку prueba, то есть "(hashtag["text"])". Я хочу отправить не точную вещь, а ее содержание.

Заранее спасибо.

1 Ответ

0 голосов
/ 31 марта 2019

Как насчет producer.send('topic', hashtag)? Вам также необходимо убедиться, что данные закодированы в необработанные байты, что и хранит kafka. Если хэштег является простой строкой, вы можете сделать producer.send('topic', hashtag.encode('utf-8')). Если это dict или более сложная структура данных, вам может понадобиться использовать json.dumps перед кодированием в байты. Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...