Получение сообщений с использованием Asyncio (асинхронно) и получение переменной в качестве конечного результата - PullRequest
1 голос
/ 02 декабря 2019

В eventhub я пытаюсь получать данные одновременно с помощью модуля asyncio, и это обсуждается в Здесь .

Проблема, которую я рассматриваю, заключается в том, что когда яопределить переменную в цикле for, она просто исчезает, когда цикл останавливается с помощью loop.stop ()

Код практически идентичен приведенному выше.

Классопределены следующие:

global list_
list_ = []

class EventProcessor(AbstractEventProcessor):

    def __init__(self, params=None):       
        super().__init__(params)
        self._msg_counter = 0

    async def open_async(self, context):        
        print("Connection established {}".format(context.partition_id))

    async def close_async(self, context, reason):

        print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
            reason,
            context.partition_id,
            context.offset,
            context.sequence_number))

    async def process_events_async(self, context, messages):

         for event_data in messages:
            last_offset = event_data.offset.value
            last_sn = event_data.sequence_number
            data = event_data.body_as_str(encoding= 'UTF-8')
            list_.append(data)
            print("Received data: {}, Num:{}".format(last_sn, len(list_))

            if len(list_) == 10:
               self.loop.close()   ## <- it does not stop at len(list_) == 10
              #self.loop.stop()    ## <- it does stop but the "list_" is dissapeared.  


    async def process_error_async(self, context, error):

        print("Event Processor Error {!r}".format(error))

Как я уже говорил выше, использование self.loop.close () и self.loop.stop () не сработало так, как мне хотелось.

Для следующего кода он просто делает цикл и работает до завершения задач

loop = asyncio.get_event_loop()

# Storage Account Credentials
STORAGE_ACCOUNT_NAME = "xxx"
STORAGE_KEY = "xxxx"
LEASE_CONTAINER_NAME = "xxx"
NAMESPACE = "xxx"
EVENTHUB = "xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxxx"

# Eventhub config and storage manager 
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
eh_options = EPHOptions()
eh_options.release_pump_on_timeout = True
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)

# Event loop and host
host = EventProcessorHost(
    EventProcessor,
    eh_config,
    storage_manager,
    ep_params=["param1","param2"],
    eph_options=eh_options,
    loop=loop)



tasks = asyncio.gather(
    host.open_async(),
    wait_and_close(host))
loop.run_until_complete(tasks)

Последняя переменная, из которой я хотел бы экспортировать, - это list_ "

1 Ответ

0 голосов
/ 02 декабря 2019

На данный момент я не могу найти решение, чтобы остановить его при выполнении условия, но в качестве обходного пути мы можем контролировать, сколько данных о событиях можно добавить в список _.

Вот пример:

import logging
import asyncio
import os
import sys
import signal
import functools

from azure.eventprocessorhost import (
    AbstractEventProcessor,
    AzureStorageCheckpointLeaseManager,
    EventHubConfig,
    EventProcessorHost,
    EPHOptions
)

global list_
list_ = []

class EventProcessor(AbstractEventProcessor):
    """
        Example Implmentation of AbstractEventProcessor
    """
    def __init__(self, params=None):
        """
        Init Event processor
        """

        super().__init__(params)
        self._msg_counter = 0

    async def open_async(self, context):
        """
        Called by processor host to initialize the event processor.
        """
        print("Connection established {}".format(context.partition_id))

    async def close_async(self, context, reason):
        """
        Called by processor host to indicate that the event processor is being stopped.
        :param context: Information about the partition
        :type context: ~azure.eventprocessorhost.PartitionContext
        """
        print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
            reason,
            context.partition_id,
            context.offset,
            context.sequence_number))

    async def process_events_async(self, context, messages):
        """
        Called by the processor host when a batch of events has arrived.
        This is where the real work of the event processor is done.
        :param context: Information about the partition
        :type context: ~azure.eventprocessorhost.PartitionContext
        :param messages: The events to be processed.
        :type messages: list[~azure.eventhub.common.EventData]
        """

        for m in messages:
            data = m.body_as_str()          
            print("Received data: {}".format(data))
            if len(list_) < 10:
                list_.append(data)


    async def process_error_async(self, context, error):
        """
        Called when the underlying client experiences an error while receiving.
        EventProcessorHost will take care of recovering from the error and
        continuing to pump messages,so no action is required from
        :param context: Information about the partition
        :type context: ~azure.eventprocessorhost.PartitionContext
        :param error: The error that occured.
        """
        print("Event Processor Error {!r}".format(error))

async def wait_and_close(host):
    """
    Run EventProcessorHost for 2 minutes then shutdown.
    """
    await asyncio.sleep(10)
    await host.close_async()


loop = asyncio.get_event_loop()

# Storage Account Credentials
STORAGE_ACCOUNT_NAME = "xxx"
STORAGE_KEY = "xxx"
LEASE_CONTAINER_NAME = "xxx"
NAMESPACE = "xxxx"
EVENTHUB = "xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxx"

# Eventhub config and storage manager 
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
eh_options = EPHOptions()
eh_options.release_pump_on_timeout = True
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(
STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)

# Event loop and host
host = EventProcessorHost(
    EventProcessor,
    eh_config,
    storage_manager,
    ep_params=["param1","param2"],
    eph_options=eh_options,
    loop=loop)

tasks = asyncio.gather(
    host.open_async(),
    wait_and_close(host))
loop.run_until_complete(tasks)   

print("*the length of list_*")
print(len(list_))
print("***the value from list_ variable***")
print(list_)

Результат:

enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...