Как остановить клиентского клиента концентратора событий Azure в Python - PullRequest
0 голосов
/ 07 августа 2020

У меня проблемы с Azure Event Bub с Python. Ниже приведен мой код Strater для подключения (Взято из microsoft docs)

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore


async def on_event(partition_context, event):
    # Print the event data.
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main()) 

Здесь получатель / потребитель продолжает слушать. Если я удалю любое из ожиданий, потребитель выдает ошибку. Кто-нибудь знает, как остановить потребителя после некоторого времени работы вроде тайм-аута).

1 Ответ

2 голосов
/ 10 августа 2020

@ Abhishek

Здесь есть 2 варианта:

  1. Вы можете прекратить прослушивание, если в течение определенного периода времени неактивно.
  2. Вы можете перестать слушать по истечении фиксированной продолжительности.

Подробно изложите оба в шагах ниже.

ВАРИАНТ 1

Вы можете использовать параметр max_wait_time, чтобы остановить прослушивание в случае отсутствия активности в течение определенного времени.

enter image description here

I did spin up a simple use case of the above. But you could optimize this further.

import asyncio
from azure.eventhub.aio import EventHubConsumerClient

event_hub_connection_str = ''
eventhub_name = ''


consumer = EventHubConsumerClient.from_connection_string(
     conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

#this event gets called when the message is received or Max_Wait_time is clocked
async def on_event(partition_context, event):
       print(event) #Optional - to see output
       #Checks whether there is any event returned None. None is returned when this event is called after the Max_Wait_time is crossed
       if(event !=None):
            print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
            #you can update other code like updating blob store
                
       else:
           print("Timeout is Hit")
           #updating the 
           global receive
           receive = False

  
async def close():
    print("Closing the client.")
    await consumer.close()
    print("Closed")
    
async def main():
    recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event,max_wait_time=15))
    while(True): # keep receiving for 3 seconds
        await asyncio.sleep(3) 
        if(receive != True):
            print("Cancelling the Task")
            recv_task.cancel()  # stop receiving by cancelling the task
            break;
  
   

receive = True
asyncio.run(main())
asyncio.run(close())#closing the Client

With regards to the above code. If there is no activity 15 seconds the async task gets cancelled and the consumer clients gets closed. The program is eventually exited gracefully.

OPTION 2

If you are looking for a code in which the you would like to make client to listen for fixed time like 1 hour or some thing. You could refer the below code

Код ссылки

event_hub_connection_str = '<>'
eventhub_name = '<>'
import asyncio

from azure.eventhub.aio import EventHubConsumerClient

consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       print("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
   
async def main():
        recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
        await asyncio.sleep(15)  # keep receiving for 3 seconds
        recv_task.cancel()  # stop receiving

async def close():
    print("Closing.....")
    await consumer.close()
    print("Closed") 

asyncio.run(main())
asyncio.run(close())#closing the Client

Приведенный ниже код, который отвечает за прослушивание клиентом определенное время:

recv_task =
asyncio.ensure_future(consumer.receive(on_event=on_event))    
await asyncio.sleep(3)  # keep receiving for 3 seconds    
recv_task.cancel()

Вы можете увеличить время в соответствии с вашими потребностями.

...