@ Abhishek
Здесь есть 2 варианта:
- Вы можете прекратить прослушивание, если в течение определенного периода времени неактивно.
- Вы можете перестать слушать по истечении фиксированной продолжительности.
Подробно изложите оба в шагах ниже.
ВАРИАНТ 1
Вы можете использовать параметр max_wait_time, чтобы остановить прослушивание в случае отсутствия активности в течение определенного времени.
![enter image description here](https://i.stack.imgur.com/5fWA2.png)
I did spin up a simple use case of the above. But you could optimize this further.
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = ''
eventhub_name = ''
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
#this event gets called when the message is received or Max_Wait_time is clocked
async def on_event(partition_context, event):
print(event) #Optional - to see output
#Checks whether there is any event returned None. None is returned when this event is called after the Max_Wait_time is crossed
if(event !=None):
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
#you can update other code like updating blob store
else:
print("Timeout is Hit")
#updating the
global receive
receive = False
async def close():
print("Closing the client.")
await consumer.close()
print("Closed")
async def main():
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event,max_wait_time=15))
while(True): # keep receiving for 3 seconds
await asyncio.sleep(3)
if(receive != True):
print("Cancelling the Task")
recv_task.cancel() # stop receiving by cancelling the task
break;
receive = True
asyncio.run(main())
asyncio.run(close())#closing the Client
With regards to the above code. If there is no activity 15 seconds the async task gets cancelled and the consumer clients gets closed. The program is eventually exited gracefully.
OPTION 2
If you are looking for a code in which the you would like to make client to listen for fixed time like 1 hour or some thing. You could refer the below code
Код ссылки
event_hub_connection_str = '<>'
eventhub_name = '<>'
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
print("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
async def main():
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(15) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
async def close():
print("Closing.....")
await consumer.close()
print("Closed")
asyncio.run(main())
asyncio.run(close())#closing the Client
Приведенный ниже код, который отвечает за прослушивание клиентом определенное время:
recv_task =
asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel()
Вы можете увеличить время в соответствии с вашими потребностями.