Почему Pymongo закрывает соединение в середине процесса? - PullRequest
0 голосов
/ 26 октября 2019

Я запускаю скрипт на python в Jupyter Notebook, который подключается к dong монго для обработки текстовых документов (новостных статей). Сценарий работает нормально для первых нескольких пакетов данных, но затем завершается со следующей ошибкой:

AutoReconnect: localhost: 27017: [WinError 10054] Существующее соединение было принудительно закрыто удаленным хостом ( полная ошибка ниже ).

Уже пытались удалить файл mongod.lock, как указано здесь ( Pymongo продолжает отказывать в соединении на 27017 ), однако,это не решило проблему.

Это моя функция (с некоторыми подфункциями, не включенными здесь):

    data_list = []
    for collection in mongo_collections_dir:
        mongo_collection = mongo_collections_dir[collection]
        filter_dict = {"file_info.source": source}
        if filter_year:
            filter_dict["extracted.publication_date.year"] = filter_year
        elif min_year:
            print('filter year not found') # - note added by dror for debug 24/10/2019
            filter_dict["extracted.publication_date.year"] = {"$gt": min_year}
        source_count = mongo_collection.count(filter_dict)
        print("{} articles found in collection {} {}".format(source_count, collection, filter_year))
        if source_count == "0":
            continue
        docs = mongo_collection.find(filter_dict, no_cursor_timeout=True)
        if not docs:
            print("source {} was not found in collection {}".format(source, collection))
            continue
        for pos, doc in enumerate(docs):
            if pos % 100000 == 0:
                print("processed {} articles out of {} from {}".format(pos, source_count, source))
            try:
                text = doc["body"]["content"]
            except KeyError:
#                 print('no body')
                continue  
            if clean_text:
                clean_text = mpd_clean_text(text, stop_words)
            else:
                clean_text = ''
            try:
                title = doc['body']['head']['hedline']
#                 author = doc['body']['head']['byline']
                temp_dir = {"collection": mongo_collection.name, "source": doc["file_info"]["source"], 
                            "urn": doc["urn"],  "title": title,
                            'unit_text': text, 'clean text': clean_text, 
                           }
            except KeyError:
                temp_dir = {"collection": mongo_collection.name, "source": doc["file_info"]["source"], "urn": doc["urn"],
                           'unit_text': text, 'clean text': clean_text}  

            try:
                publication_date = get_dt(doc["extracted"]["publication_date"])
                temp_dir['publication_date'] = publication_date
            except KeyError:
                print('no extracted')
            try:
                temp_dir['section'] = doc['extracted']['section']
            except KeyError:
                pass
            try:
                temp_dir['publication_name'] = doc['extracted']['publication_name']
            except KeyError:
                pass

            if temp_dir:
#                 temp_dir['section'] = section
                data_list.append(temp_dir)
#     df = pd.DataFrame(data_list)
#     df['section'] = section
    return pd.DataFrame(data_list)

Полная ошибка:

~\Anaconda3\envs\py35\lib\site-packages\pymongo\pool.py in command(self, dbname, spec, slave_ok, read_preference, codec_options, check, allowable_errors, check_keys, read_concern, write_concern, parse_write_concern_error, collation, session, client, retryable_write, publish_events)
    578                            use_op_msg=self.op_msg_enabled,
--> 579                            unacknowledged=unacknowledged)
    580         except OperationFailure:

~\Anaconda3\envs\py35\lib\site-packages\pymongo\network.py in command(sock, dbname, spec, slave_ok, is_mongos, read_preference, codec_options, session, client, check, allowable_errors, address, check_keys, listeners, max_bson_size, read_concern, parse_write_concern_error, collation, compression_ctx, use_op_msg, unacknowledged)
    140         else:
--> 141             reply = receive_message(sock, request_id)
    142             unpacked_docs = reply.unpack_response(codec_options=codec_options)

~\Anaconda3\envs\py35\lib\site-packages\pymongo\network.py in receive_message(sock, request_id, max_message_size)
    172     length, _, response_to, op_code = _UNPACK_HEADER(
--> 173         _receive_data_on_socket(sock, 16))
    174     # No request_id for exhaust cursor "getMore".

~\Anaconda3\envs\py35\lib\site-packages\pymongo\network.py in _receive_data_on_socket(sock, length)
    231             try:
--> 232                 chunk_length = sock.recv_into(mv[bytes_read:])
    233             except (IOError, OSError) as exc:

ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

AutoReconnect                             Traceback (most recent call last)
<ipython-input-14-37a7effc9859> in <module>
----> 1 collect_df_by_sources(collections_dir, sources, newspapers_df, filter_year='2008')

<ipython-input-8-bbd9e966e95c> in collect_df_by_sources(collections_dir, sources, newspapers_df, return_df, filter_year, min_year, override)
     20 
     21         print("collect articles from: {}, {}".format(source_name, source_id))
---> 22         source_df = collect_df_by_source(collections_dir, source_id, filter_year=filter_year, min_year=min_year)
     23         source_df.to_csv(source_path, encoding='utf8')
     24         if return_df:

<ipython-input-7-82d129ff329c> in collect_df_by_source(mongo_collections_dir, source, clean_text, filter_year, min_year)
      9             print('filter year not found') # - note added by dror for debug 24/10/2019
     10             filter_dict["extracted.publication_date.year"] = {"$gt": min_year}
---> 11         source_count = mongo_collection.count(filter_dict)
     12         print("{} articles found in collection {} {}".format(source_count, collection, filter_year))
     13         if source_count == "0":

~\Anaconda3\envs\py35\lib\site-packages\pymongo\collection.py in count(self, filter, session, **kwargs)
   1764         collation = validate_collation_or_none(kwargs.pop('collation', None))
   1765         cmd.update(kwargs)
-> 1766         return self._count(cmd, collation, session)
   1767 
   1768     def create_indexes(self, indexes, session=None, **kwargs):

~\Anaconda3\envs\py35\lib\site-packages\pymongo\collection.py in _count(self, cmd, collation, session)
   1570                 read_concern=self.read_concern,
   1571                 collation=collation,
-> 1572                 session=session)
   1573         if res.get("errmsg", "") == "ns missing":
   1574             return 0

~\Anaconda3\envs\py35\lib\site-packages\pymongo\collection.py in _command(self, sock_info, command, slave_ok, read_preference, codec_options, check, allowable_errors, read_concern, write_concern, collation, session, retryable_write)
    242                 session=s,
    243                 client=self.__database.client,
--> 244                 retryable_write=retryable_write)
    245 
    246     def __create(self, options, collation, session):

~\Anaconda3\envs\py35\lib\site-packages\pymongo\pool.py in command(self, dbname, spec, slave_ok, read_preference, codec_options, check, allowable_errors, check_keys, read_concern, write_concern, parse_write_concern_error, collation, session, client, retryable_write, publish_events)
    582         # Catch socket.error, KeyboardInterrupt, etc. and close ourselves.
    583         except BaseException as error:
--> 584             self._raise_connection_failure(error)
    585 
    586     def send_message(self, message, max_doc_size):

~\Anaconda3\envs\py35\lib\site-packages\pymongo\pool.py in _raise_connection_failure(self, error)
    741         self.close()
    742         if isinstance(error, socket.error):
--> 743             _raise_connection_failure(self.address, error)
    744         else:
    745             raise error

~\Anaconda3\envs\py35\lib\site-packages\pymongo\pool.py in _raise_connection_failure(address, error, msg_prefix)
    281         raise NetworkTimeout(msg)
    282     else:
--> 283         raise AutoReconnect(msg)
    284 
    285 

AutoReconnect: localhost:27017: [WinError 10054] An existing connection was forcibly closed by the remote host```


...