KSQL выберите удалить событие - PullRequest
0 голосов
/ 24 сентября 2019

У меня есть kafka KTable, который содержит идентификаторы пользователей.Я называю этот Ktable "UserWhitelist".Этот Ktable сделан из темы "Пользователи".Если новое сообщение вставляется / обновляется в теме «Пользователи», я получаю новую запись в Ktable, и событие вызывается.Если я отправляю ключ с надгробной плитой в тему пользователей, запись из Ktable удаляется, но я не получаю никаких событий для моего потребителя. Ниже приведен код, как я слушаю события Ktable.

using (var client = new HttpClient())
        {

            client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);

            var request = new HttpRequestMessage(HttpMethod.Post, url);
            request.Method = HttpMethod.Post;
            request.Content = new System.Net.Http.StringContent("{ \"ksql\": \"select * from UserWhitelist;\",\"streamsProperties\": { \"ksql.streams.auto.offset.reset\": \"earliest\"}}", Encoding.UTF8, "application/vnd.ksql.v1+json");
            //request.Content.Headers.Add("Accept", "application/vnd.ksql.v1+json");
            using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
            {
                using (var body = await response.Content.ReadAsStreamAsync())
                using (var reader = new StreamReader(body))
                    while (!reader.EndOfStream)
                        Console.WriteLine(reader.ReadLine());
            }
        }

Doя правильно понимаю, что удаление не должно быть поднято?Если да, то какой подход вы могли бы предложить для моей ситуации, если бы я хотел иметь белый список и всегда информировать каждого потребителя, если какие-либо обновления / удаления / вставки были сделаны на Ktable.

1 Ответ

1 голос
/ 24 сентября 2019

Оператор выбора KSQL всегда будет отображать непрерывное состояние таблицы.Если вы уже распечатали событие, а затем удалите его, оно все равно будет напечатано, и вы не увидите никаких новых обновлений для ключа, пока его значение не будет добавлено обратно в таблицу.

В самой таблице строка будет удалена, но единственный способ получить уведомление об этом событии - это использовать фактического потребителя Kafka в основной теме таблицы, а затем проверить, когда значениезаписи равны нулю.

То, что вы запрашиваете, обычно делается путем предоставления вторичного API REST через интерактивные запросы от процессора Java KTable, и билет KSQL можно найти здесь - https://github.com/confluentinc/ksql/issues/530

Тем не менее, вам все равно придется отслеживать все значения между запросами, а затем сканировать их все, чтобы узнать, какие были удалены, по сравнению с использованием всех событий, а затем отфильтровывать ненулевые события

...