В eventhub я пытаюсь получать данные одновременно с помощью модуля asyncio, и это обсуждается в Здесь .
Проблема, которую я рассматриваю, заключается в том, что когда яопределить переменную в цикле for, она просто исчезает, когда цикл останавливается с помощью loop.stop ()
Код практически идентичен приведенному выше.
Классопределены следующие:
global list_
list_ = []
class EventProcessor(AbstractEventProcessor):
def __init__(self, params=None):
super().__init__(params)
self._msg_counter = 0
async def open_async(self, context):
print("Connection established {}".format(context.partition_id))
async def close_async(self, context, reason):
print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
reason,
context.partition_id,
context.offset,
context.sequence_number))
async def process_events_async(self, context, messages):
for event_data in messages:
last_offset = event_data.offset.value
last_sn = event_data.sequence_number
data = event_data.body_as_str(encoding= 'UTF-8')
list_.append(data)
print("Received data: {}, Num:{}".format(last_sn, len(list_))
if len(list_) == 10:
self.loop.close() ## <- it does not stop at len(list_) == 10
#self.loop.stop() ## <- it does stop but the "list_" is dissapeared.
async def process_error_async(self, context, error):
print("Event Processor Error {!r}".format(error))
Как я уже говорил выше, использование self.loop.close () и self.loop.stop () не сработало так, как мне хотелось.
Для следующего кода он просто делает цикл и работает до завершения задач
loop = asyncio.get_event_loop()
# Storage Account Credentials
STORAGE_ACCOUNT_NAME = "xxx"
STORAGE_KEY = "xxxx"
LEASE_CONTAINER_NAME = "xxx"
NAMESPACE = "xxx"
EVENTHUB = "xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxxx"
# Eventhub config and storage manager
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
eh_options = EPHOptions()
eh_options.release_pump_on_timeout = True
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
# Event loop and host
host = EventProcessorHost(
EventProcessor,
eh_config,
storage_manager,
ep_params=["param1","param2"],
eph_options=eh_options,
loop=loop)
tasks = asyncio.gather(
host.open_async(),
wait_and_close(host))
loop.run_until_complete(tasks)
Последняя переменная, из которой я хотел бы экспортировать, - это list_ "