Соединитель источника Kafka не извлекает запись, как ожидалось, когда записи вставляются в тему источника из нескольких источников - PullRequest
0 голосов
/ 23 марта 2019

В одном из моих вариантов использования я пытаюсь создать конвейер

всякий раз, когда я отправлял сообщение из пользовательского раздела, я отправлял метку времени в миллисекундах с типом данных LONG, поскольку в схеме столбец метки времени был определен как long.

Код, который я имел ранее в пользовательском разделе:

Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

Показать результат до того, как я отправил запись:

date = вт 26 марта 22:02:04 EDT 2019, время в миллисекундах = 1553652124063

значение вставлено в столбец отметки времени в таблице2:

3/27/2019 2: 02: 04.063000 AM

Поскольку он принимает британский часовой пояс (я полагаю), я поставил временное исправление на время, чтобы вычесть 4 часа из текущей временной метки, чтобы я мог сопоставить ее с американской временной меткой EST.

Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

Показать результат:

дата = вторник, 26 марта 22:04:43 ПО ВОСТОЧНОМУ ВРЕМЕНИ 2019, время в миллисекундах = 1553637883826

значение вставлено в столбец отметки времени в таблице2:

3/26/2019 10: 04: 43.826000 PM

Пожалуйста, дайте мне знать, если я что-то упустил, так как я не уверен, почему это происходит, когда я отправлял сообщение из пользовательского раздела.

1 Ответ

1 голос
/ 24 марта 2019

Под капотом Jdbc Source Connector используйте следующий запрос:

SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
    (someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
    OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC

Обобщение : запрос извлекает строки, если значение их столбца отметки времени ранее текущая отметка времени и позже , чем последняя проверка.

Вышеуказанные параметры:

  1. beginTimetampValue - значение столбца метки времени последней импортированной записи
  2. endTimetampValue - текущая метка времени в соответствии с базой данных
  3. lastIncrementedValue - значение добавочного столбца последней импортированной записи

Я думаю, что в вашем случае Producer поместите в записи таблиц с более высокой отметкой времени , чем вы позже вставите вручную (используя запрос).

Когда Jdbc Connector проверяет наличие новых записей для импорта в Kafka, он пропускает их (поскольку они не заполнены someTimestampColumn < $endTimetampValue условие отметки времени )

Вы также можете изменить уровень журнала на DEBUG и посмотреть, что происходит в журналах

...