Ошибка Tweepy IncompleteRead при потоковой передаче данных - PullRequest
0 голосов
/ 15 апреля 2020

Я получаю ошибку ниже при потоковой передаче данных с помощью API Twitter.

New rows have been added.
('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.6/http/client.py", line 593, in _readinto_chunked
    n = self._safe_readinto(mvb)
  File "/opt/conda/default/lib/python3.6/http/client.py", line 640, in _safe_readinto
    raise IncompleteRead(bytes(mvb[0:total_bytes]), len(b))
http.client.IncompleteRead: IncompleteRead(0 bytes read, 512 more expected)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.6/site-packages/urllib3/response.py", line 425, in _error_catcher
    yield
  File "/opt/conda/default/lib/python3.6/site-packages/urllib3/response.py", line 507, in read
    data = self._fp.read(amt) if not fp_closed else b""
  File "/opt/conda/default/lib/python3.6/http/client.py", line 459, in read
    n = self.readinto(b)
  File "/opt/conda/default/lib/python3.6/http/client.py", line 493, in readinto
    return self._readinto_chunked(b)
  File "/opt/conda/default/lib/python3.6/http/client.py", line 604, in _readinto_chunked
    raise IncompleteRead(bytes(b[0:total_bytes]))
http.client.IncompleteRead: IncompleteRead(0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/opt/conda/default/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/default/lib/python3.6/site-packages/tweepy/streaming.py", line 320, in _run
    six.reraise(*exc_info)
  File "/opt/conda/default/lib/python3.6/site-packages/six.py", line 703, in reraise
    raise value
  File "/opt/conda/default/lib/python3.6/site-packages/tweepy/streaming.py", line 289, in _run
    self._read_loop(resp)
  File "/opt/conda/default/lib/python3.6/site-packages/tweepy/streaming.py", line 339, in _read_loop
    line = buf.read_line()
  File "/opt/conda/default/lib/python3.6/site-packages/tweepy/streaming.py", line 200, in read_line
    self._buffer += self._stream.read(self._chunk_size)
  File "/opt/conda/default/lib/python3.6/site-packages/urllib3/response.py", line 529, in read
    raise IncompleteRead(self._fp_bytes_read, self.length_remaining)
  File "/opt/conda/default/lib/python3.6/contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "/opt/conda/default/lib/python3.6/site-packages/urllib3/response.py", line 443, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))

Вот мой код -

# Load the required tweepy and pandas libraries
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import tweepy as tw

print("Loading google cloud library")
from google.cloud import bigquery

# override the Listener class
class StdOutListener(StreamListener):

    # entry point of the code 
    def __init__(self, limit,api=None):
        super(StdOutListener, self).__init__()
        self.num_tweets = 0
        self.limit = limit

    # function called when stream starts    
    def on_status(self, status):
        # record = {'Text': status.text, 'Created At': status.created_at}
        #print(record) # printing some sample data
        if self.num_tweets < self.limit:
            self.num_tweets += 1
            created_at = status.created_at
            if status.truncated == False:
                tweet = status.text
            else:
                tweet = status.extended_tweet.get('full_text')
            timestamp_ms = status.timestamp_ms

            if status.place is None:
                country_code = ""
                place_name = ""
                place_full_name = ""
                place_type = ""
                lat = -1
                long = -1
            else:
                country_code = status.place.country_code
                place_name = status.place.name
                place_full_name = status.place.full_name
                place_type = status.place.place_type
                bounding_box1_long = status.place.bounding_box.coordinates[0][0][0]
                bounding_box1_lat  = status.place.bounding_box.coordinates[0][0][1]
                bounding_box2_long = status.place.bounding_box.coordinates[0][1][0]
                bounding_box2_lat  = status.place.bounding_box.coordinates[0][1][1]
                bounding_box3_long = status.place.bounding_box.coordinates[0][2][0]
                bounding_box3_lat  = status.place.bounding_box.coordinates[0][2][1]
                bounding_box4_long = status.place.bounding_box.coordinates[0][3][0]
                bounding_box4_lat  = status.place.bounding_box.coordinates[0][3][1]
                long = (bounding_box1_long + bounding_box2_long +  bounding_box3_long + bounding_box4_long) / 4
                lat = (bounding_box1_lat + bounding_box2_lat +  bounding_box3_lat + bounding_box4_lat) / 4

            tweet_id = status.id
            user_id = status.user.id

            my_tuple = ( str(created_at)[0:10] ,str(tweet_id), str(user_id), str(created_at), str(tweet), str(timestamp_ms), str(country_code), str(place_name), str(place_full_name), str(lat), str(long), str(place_type))
            my_list = []
            #print("my tuple" + str(my_tuple))
            my_list.append(my_tuple)
            #print("my list" + str(my_list))
            #TODO(developer): Import the client library.


            #TODO(developer): Construct a BigQuery client object.
            #print("loading BQ client")
            client = bigquery.Client()

            #TODO(developer): Set table_id to the ID of the model to fetch.
            table_id = "project.dataset.table"

            #print("fetching table from BQ")
            table = client.get_table(table_id)  # Make an API request.

            #print("inserting rows")
            errors = client.insert_rows(table, my_list)  # Make an API request.
            if errors == []:
                print("New rows have been added.")
            return True
        else:
            return False


    def on_error(self, status):
        print('Error on status', status)

    def on_limit(self, status):
        print('Limit threshold exceeded', status)

    def on_timeout(self, status):
        print('Stream disconnected; continuing...')

    def on_exception(self, exception):
        #print("inside exception !!!")
        print(exception)
        return





consumer_key = 'blah'
consumer_secret = 'blah'
access_token = 'blah'
access_token_secret = 'blah'

auth = tw.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

# create a bounding box for the state of california
GEOBOX_CALIFORNIA = [-126.81,32.04,-114.12,42.95]

tweet_limit = "100000000"

# create a stream object from the class with the required filter
stream = Stream(auth, StdOutListener(limit = int(tweet_limit)) )

print("starting stream")
# start the stream
# the stream will automatically end when limit is reached
stream.filter(locations=GEOBOX_CALIFORNIA , is_async=True) 
print("stream ended")

Как мне справиться с этими ошибками? Я хочу убедиться, что я продолжаю потоковую передачу данных в BigQuery с помощью Twitter API. Правильно ли я здесь обрабатываю потоковое соединение?

Я новичок в этом API и не могу следить за тем, как поддерживать соединение, пока вставка происходит в bigquery.

...