Как отправить данные в EventHub с установкой PartitionId, а не PartitionKey (Python) - PullRequest
0 голосов
/ 22 апреля 2019

Я видел в Microsoft Docs, что есть способ отправить данные в нужный раздел, установив PartitionId вместо PartitionKey (используя C #).

CreatePartitionSender (String) Создать PartitionSender, который может публиковать EventData напрямую в конкретный раздел EventHub.

Однако я не смог найти то же самое в Python.

Есть ли доступный способ?

Ответы [ 2 ]

0 голосов
/ 23 апреля 2019

Существует два способа отправки данных в концентраторы событий Azure: HTTP REST API и протокол AMQP 1.0.

Для использования HTTP REST API или Azure EventHub Python Client Library , толькоПараметр partitionId поддерживает отправку нового события в указанный раздел в концентраторе событий, как показано ниже.

  1. REST API Send partition event требуетpartitionId параметр в конечной точке https://{servicebusNamespace}.servicebus.windows.net/{eventHubPath}/partitions/{partitionId}/messages, и это единственный API REST, поддерживающий функцию отправки раздела partition

  2. Комментарий исходного кода Sender.py объясняет partition параметр, как показано ниже.

    :param partition: The specific partition ID to send to. Default is None, in which case the service
     will assign to all partitions using round-robin.
    :type partition: str
    

Таким образом, на самом деле вы не можете использовать значение partitionKey для отправки события в указанный раздел EventHub, за исключением использования AMQP1.0 в Python.Для использования AMQP 1.0 см. Официальный документ AMQP 1.0 in Azure Service Bus and Event Hubs protocol guide и найдите на странице слова partition-key, результат приведен ниже.

enter image description here

0 голосов
/ 23 апреля 2019

Я не совсем уверен, но с использованием Python, вот метод для открытия соединения

def open(self):
        """
        Open the Sender using the supplied conneciton.
        If the handler has previously been redirected, the redirect
        context will be used to create a new handler before opening it.
        :param connection: The underlying client shared connection.
        :type: connection: ~uamqp.connection.Connection
        """
        self.running = True
        if self.redirected:
            self.target = self.redirected.address
            self._handler = SendClient(
                self.target,
                auth=self.client.get_auth(),
                debug=self.client.debug,
                msg_timeout=self.timeout,
                error_policy=self.retry_policy,
                keep_alive_interval=self.keep_alive,
                client_name=self.name,
                properties=self.client.create_properties())
        self._handler.open()
        while not self._handler.client_ready():
            time.sleep(0.05)

, а вот Init

def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):
        """
        Instantiate an EventHub event Sender handler.
        :param client: The parent EventHubClient.
        :type client: ~azure.eventhub.client.EventHubClient.
        :param target: The URI of the EventHub to send to.
        :type target: str
        :param partition: The specific partition ID to send to. Default is None, in which case the service
         will assign to all partitions using round-robin.
        :type partition: str
        :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
         queued. Default value is 60 seconds. If set to 0, there will be no timeout.
        :type send_timeout: int
        :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
         periods of inactivity. The default value is None, i.e. no keep alive pings.
        :type keep_alive: int
        :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
         Default value is `True`.
        :type auto_reconnect: bool
        """
        self.running = False
        self.client = client
        self.target = target
        self.partition = partition
        self.timeout = send_timeout
        self.redirected = None
        self.error = None
        self.keep_alive = keep_alive
        self.auto_reconnect = auto_reconnect
        self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
        self.reconnect_backoff = 1
        self.name = "EHSender-{}".format(uuid.uuid4())
        if partition:
            self.target += "/Partitions/" + partition
            self.name += "-partition{}".format(partition)
        self._handler = SendClient(
            self.target,
            auth=self.client.get_auth(),
            debug=self.client.debug,
            msg_timeout=self.timeout,
            error_policy=self.retry_policy,
            keep_alive_interval=self.keep_alive,
            client_name=self.name,
            properties=self.client.create_properties())
        self._outcome = None
        self._condition = None

Я считаю,нижняя строка функции создаст только отправителя раздела

if partition:
                self.target += "/Partitions/" + partition
                self.name += "-partition{}".format(partition)

Ссылка

https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/sender.py

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...