Я получаю ошибку ниже при потоковой передаче данных с помощью API Twitter.
New rows have been added.
('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
Exception in thread Thread-3:
Traceback (most recent call last):
File "/opt/conda/default/lib/python3.6/http/client.py", line 593, in _readinto_chunked
n = self._safe_readinto(mvb)
File "/opt/conda/default/lib/python3.6/http/client.py", line 640, in _safe_readinto
raise IncompleteRead(bytes(mvb[0:total_bytes]), len(b))
http.client.IncompleteRead: IncompleteRead(0 bytes read, 512 more expected)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/default/lib/python3.6/site-packages/urllib3/response.py", line 425, in _error_catcher
yield
File "/opt/conda/default/lib/python3.6/site-packages/urllib3/response.py", line 507, in read
data = self._fp.read(amt) if not fp_closed else b""
File "/opt/conda/default/lib/python3.6/http/client.py", line 459, in read
n = self.readinto(b)
File "/opt/conda/default/lib/python3.6/http/client.py", line 493, in readinto
return self._readinto_chunked(b)
File "/opt/conda/default/lib/python3.6/http/client.py", line 604, in _readinto_chunked
raise IncompleteRead(bytes(b[0:total_bytes]))
http.client.IncompleteRead: IncompleteRead(0 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/default/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/opt/conda/default/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/opt/conda/default/lib/python3.6/site-packages/tweepy/streaming.py", line 320, in _run
six.reraise(*exc_info)
File "/opt/conda/default/lib/python3.6/site-packages/six.py", line 703, in reraise
raise value
File "/opt/conda/default/lib/python3.6/site-packages/tweepy/streaming.py", line 289, in _run
self._read_loop(resp)
File "/opt/conda/default/lib/python3.6/site-packages/tweepy/streaming.py", line 339, in _read_loop
line = buf.read_line()
File "/opt/conda/default/lib/python3.6/site-packages/tweepy/streaming.py", line 200, in read_line
self._buffer += self._stream.read(self._chunk_size)
File "/opt/conda/default/lib/python3.6/site-packages/urllib3/response.py", line 529, in read
raise IncompleteRead(self._fp_bytes_read, self.length_remaining)
File "/opt/conda/default/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/opt/conda/default/lib/python3.6/site-packages/urllib3/response.py", line 443, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
Вот мой код -
# Load the required tweepy and pandas libraries
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import tweepy as tw
print("Loading google cloud library")
from google.cloud import bigquery
# override the Listener class
class StdOutListener(StreamListener):
# entry point of the code
def __init__(self, limit,api=None):
super(StdOutListener, self).__init__()
self.num_tweets = 0
self.limit = limit
# function called when stream starts
def on_status(self, status):
# record = {'Text': status.text, 'Created At': status.created_at}
#print(record) # printing some sample data
if self.num_tweets < self.limit:
self.num_tweets += 1
created_at = status.created_at
if status.truncated == False:
tweet = status.text
else:
tweet = status.extended_tweet.get('full_text')
timestamp_ms = status.timestamp_ms
if status.place is None:
country_code = ""
place_name = ""
place_full_name = ""
place_type = ""
lat = -1
long = -1
else:
country_code = status.place.country_code
place_name = status.place.name
place_full_name = status.place.full_name
place_type = status.place.place_type
bounding_box1_long = status.place.bounding_box.coordinates[0][0][0]
bounding_box1_lat = status.place.bounding_box.coordinates[0][0][1]
bounding_box2_long = status.place.bounding_box.coordinates[0][1][0]
bounding_box2_lat = status.place.bounding_box.coordinates[0][1][1]
bounding_box3_long = status.place.bounding_box.coordinates[0][2][0]
bounding_box3_lat = status.place.bounding_box.coordinates[0][2][1]
bounding_box4_long = status.place.bounding_box.coordinates[0][3][0]
bounding_box4_lat = status.place.bounding_box.coordinates[0][3][1]
long = (bounding_box1_long + bounding_box2_long + bounding_box3_long + bounding_box4_long) / 4
lat = (bounding_box1_lat + bounding_box2_lat + bounding_box3_lat + bounding_box4_lat) / 4
tweet_id = status.id
user_id = status.user.id
my_tuple = ( str(created_at)[0:10] ,str(tweet_id), str(user_id), str(created_at), str(tweet), str(timestamp_ms), str(country_code), str(place_name), str(place_full_name), str(lat), str(long), str(place_type))
my_list = []
#print("my tuple" + str(my_tuple))
my_list.append(my_tuple)
#print("my list" + str(my_list))
#TODO(developer): Import the client library.
#TODO(developer): Construct a BigQuery client object.
#print("loading BQ client")
client = bigquery.Client()
#TODO(developer): Set table_id to the ID of the model to fetch.
table_id = "project.dataset.table"
#print("fetching table from BQ")
table = client.get_table(table_id) # Make an API request.
#print("inserting rows")
errors = client.insert_rows(table, my_list) # Make an API request.
if errors == []:
print("New rows have been added.")
return True
else:
return False
def on_error(self, status):
print('Error on status', status)
def on_limit(self, status):
print('Limit threshold exceeded', status)
def on_timeout(self, status):
print('Stream disconnected; continuing...')
def on_exception(self, exception):
#print("inside exception !!!")
print(exception)
return
consumer_key = 'blah'
consumer_secret = 'blah'
access_token = 'blah'
access_token_secret = 'blah'
auth = tw.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# create a bounding box for the state of california
GEOBOX_CALIFORNIA = [-126.81,32.04,-114.12,42.95]
tweet_limit = "100000000"
# create a stream object from the class with the required filter
stream = Stream(auth, StdOutListener(limit = int(tweet_limit)) )
print("starting stream")
# start the stream
# the stream will automatically end when limit is reached
stream.filter(locations=GEOBOX_CALIFORNIA , is_async=True)
print("stream ended")
Как мне справиться с этими ошибками? Я хочу убедиться, что я продолжаю потоковую передачу данных в BigQuery с помощью Twitter API. Правильно ли я здесь обрабатываю потоковое соединение?
Я новичок в этом API и не могу следить за тем, как поддерживать соединение, пока вставка происходит в bigquery.