У меня сложное приложение Kafka Stream с двумя потоками, полностью находящимися в одном и том же потоке:
- , оно использует
Execution
topi c в качестве источника, улучшило сообщение и переиздало обратно в тот же Execution
топи c. - присоединиться к другой топи c
WorkerTaskResult
, добавить результат к Execution
и опубликовать обратно к Execution
Топи c.
Основная цель состоит в том, чтобы обеспечить систему рабочего процесса.
Подробные логи c:
- Выполнение - это список TaskRun
Execution
посмотрите на все текущее состояние всех TaskRun
и найдите следующее для выполнения - Если таковое будет найдено, Выполнение измените их
TaskRunsList
, добавьте следующее и опубликуйте sh обратно в Kafka, также он отправляет в другую очередь задачу, которую нужно выполнить (WorkerTask
) - *
WorkerTask
выполняется вне потока Kafka и публикует sh обратно в другую очередь (WorkerTaskResult
) с помощью простого Kafka Consumer & Producer WorkerTaskResult
, изменяющего ток TaskRun
в текущем Execution
и меняющего статус (в основном, RUNNING / SUCCEED / FAILED) и также опубликовано обратно в Execution
очередь (с Kafka Stream)
Как видите, Execution
(со списком TaskRun
) - это состояние cu rrent application.
Поток работает хорошо, когда все сообщения являются последовательными (нет параллелизма, у меня может быть только один список изменений TaskRun
одновременно). Когда рабочий процесс стал параллельным (одновременное WorkerTaskResult
может быть объединено), кажется, что мое состояние выполнения переопределено и производит своего рода откат.
Пример вывода журнала:
2020-04-20 08:05:44,830 INFO reamThread-1 afkaExecutor Stream in with 3264792750: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=CREATED) # >>>>> t1 is created
]
)
2020-04-20 08:05:44,881 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> worker send running state
2020-04-20 08:05:44,882 INFO reamThread-1 afkaExecutor Stream out with 1805535461 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> t1 save the running state
]
)
2020-04-20 08:05:45,047 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS) # >>>>> worker send success
2020-04-20 08:05:45,047 INFO reamThread-1 afkaExecutor Stream out with 578845055 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS)
]
)
2020-04-20 08:05:45,153 INFO reamThread-1 afkaExecutor Stream in with 1805535461: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> OUT OF ORDER AND ROLLBACK TO PREVIOUS VERSION
]
)
2020-04-20 08:05:45,157 INFO reamThread-1 afkaExecutor Stream out with 1889889916 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
]
)
2020-04-20 08:05:45,209 WARN reamThread-1 KTableSource Detected out-of-order KTable update for execution at offset 10, partition 2.
2020-04-20 08:05:45,313 INFO reamThread-1 afkaExecutor Stream in with 1889889916: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
]
)
2020-04-20 08:05:45,350 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
2020-04-20 08:05:45,350 INFO reamThread-1 afkaExecutor Stream out with 3651399223 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
]
)
На консоли появляется предупреждение с сообщением Detected out-of-order KTable update for execution at offset 10, partition 7.
Полный источник можно найти здесь .
Если вы попробуете также со многими другими подходами, как этот здесь :
- поместите
Execution
и WorkerTaskResult
на одном топи c, чтобы быть уверенным, что обрабатывается одно и то же только 1 сообщение одновременно - и хранить последние
Execution
самостоятельно в StateStore (чтобы присоединиться к WorkerTaskResult
& Execution
) - но звучит так, будто я реинвенчу KTable, и это не работает лучше
или этот здесь :
- в основном то же самое, что и предыдущий (оставьте последние
Execution
в StateStore самостоятельно) - , но используя 2 KStream для KStream (удаляя KTable).
Мой вопрос таков:
- Поддерживается ли KafkaStreams этот шаблон (который не является потоком Дага, когда мы погружаемся в одну и ту же топику c)?
- Каков хороший способ сделать этот поток безопасным для параллелизма?
Любая подсказка действительно ценится, она застряла с давних времен, спасибо
РЕДАКТИРОВАТЬ 1:
Вот некоторая дополнительная информация:
- Только приложение KStream публикует sh новое событие для
Execution
, нет внешнего приложения, которое публикует sh в этой топике c, единственный случай, когда внешнее приложение, опубликованное для выполнения, является первым событием ( aka создание исполнения). - Существует
WorkerApp
(внешнее приложение, простой потребитель / производитель), который потребляет от WorkerTask
(работа, которую нужно выполнить) и публикует sh результат на WorkerTaskResult
(в основном текущее состояние приложения).
Вот упрощенная версия фактического потока:
Builder
-> Stream 1
- from KStream<WorkerTaskResult>
- join KTable<Execution>
- to Execution topic
-> Stream 2
- from KTable<Execution> (same than previous)
- multiple output
- to WorkerTaskResult topic (if found an end)
- to Execution & to WorkerTask topic (if found a next task)
- to Execution topic (if detect an Execution end)
KStream - это, в основном, приложение Executor State, которое находит то, что это следующий WorkerTask
, который необходимо выполнить и оценить, завершен ли поток, поэтому приложение может:
- создать новый
TaskRun
- изменить текущее состояние
TaskRun
- присоединение
WorkerTaskResult
или - оценивать весь исполнитель и обнаружил, что задача не выполнена (на основе зависимостей)
- изменить состояние выполнения и опубликовать sh конечное состояние SUCCEED или FAILED, которое прервет "бесконечное l oop"
В этой актуальной версии для меня действительно неясно, в чем смысл Detected out-of-order KTable update
в реальном мире? Означает ли это, что таблица KTable должна иметь одного производителя на раздел и на ключ, чтобы поддерживать порядок на топи c?
РЕДАКТИРОВАТЬ 2:
А пока я нашел новый способ думать, что потоковое приложение, кажется, работает. Модульные испытания проходят и не более Detected out-of-order
. Вот новый упрощенный поток:
Builder
- from KTable<Execution>
- leftJoin KTable<WorkerTaskResult>
- Branch
- If Join > to Execution topic
- If not joint > continue the flow
- Multiple output (same than previous)
- to WorkerTaskResult topic (if found an end)
- to Execution & to WorkerTask topic (if found a next task)
- to Execution topic (if detect an Execution end)
Я думаю, что имеет смысл:
-
WorkerTaskResult
теперь является таблицей KTable, поэтому я сохраняю только последнюю версию результат - У меня есть один поток пути (и не более 2 путей), который выводится в
Execution
(я думаю, что это самая важная часть, которая была решена не в порядке) - Кажется, что целое имеет только один выход на каждый вход (1 новое значение на
Execution
даст 1 новое значение на Execution
topi c)
здесь новая топология:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [kestra_execution])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [execution])
--> KTABLE-TOSTREAM-0000000002, KTABLE-JOINTHIS-0000000007
<-- KSTREAM-SOURCE-0000000000
Source: KSTREAM-SOURCE-0000000004 (topics: [kestra_workertaskresult])
--> KTABLE-SOURCE-0000000005
Processor: KTABLE-SOURCE-0000000005 (stores: [workertaskresult])
--> KTABLE-JOINOTHER-0000000008
<-- KSTREAM-SOURCE-0000000004
Processor: KTABLE-JOINOTHER-0000000008 (stores: [execution])
--> KTABLE-MERGE-0000000006
<-- KTABLE-SOURCE-0000000005
Processor: KTABLE-JOINTHIS-0000000007 (stores: [workertaskresult])
--> KTABLE-MERGE-0000000006
<-- KTABLE-SOURCE-0000000001
Processor: KTABLE-MERGE-0000000006 (stores: [])
--> KTABLE-TOSTREAM-0000000009
<-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008
Processor: KTABLE-TOSTREAM-0000000009 (stores: [])
--> KSTREAM-FILTER-0000000010, KSTREAM-FILTER-0000000015
<-- KTABLE-MERGE-0000000006
Processor: KSTREAM-FILTER-0000000015 (stores: [])
--> KSTREAM-MAPVALUES-0000000016
<-- KTABLE-TOSTREAM-0000000009
Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
--> KSTREAM-MAPVALUES-0000000017
<-- KSTREAM-FILTER-0000000015
Processor: KSTREAM-MAPVALUES-0000000017 (stores: [])
--> KSTREAM-FLATMAPVALUES-0000000018, KSTREAM-FILTER-0000000024, KSTREAM-FILTER-0000000019, KSTREAM-MAPVALUES-0000000067
<-- KSTREAM-MAPVALUES-0000000016
Processor: KSTREAM-FLATMAPVALUES-0000000018 (stores: [])
--> KSTREAM-FILTER-0000000042, KSTREAM-FILTER-0000000055, KSTREAM-FILTER-0000000030
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000042 (stores: [])
--> KSTREAM-MAPVALUES-0000000043
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000030 (stores: [])
--> KSTREAM-MAPVALUES-0000000031
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000055 (stores: [])
--> KSTREAM-MAPVALUES-0000000056
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-MAPVALUES-0000000043 (stores: [])
--> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000050
<-- KSTREAM-FILTER-0000000042
Processor: KSTREAM-MAPVALUES-0000000031 (stores: [])
--> KSTREAM-FILTER-0000000032, KSTREAM-FILTER-0000000038
<-- KSTREAM-FILTER-0000000030
Processor: KSTREAM-MAPVALUES-0000000056 (stores: [])
--> KSTREAM-FILTER-0000000063, KSTREAM-FILTER-0000000057
<-- KSTREAM-FILTER-0000000055
Processor: KSTREAM-FILTER-0000000024 (stores: [])
--> KSTREAM-MAPVALUES-0000000025
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000032 (stores: [])
--> KSTREAM-MAPVALUES-0000000033
<-- KSTREAM-MAPVALUES-0000000031
Processor: KSTREAM-FILTER-0000000044 (stores: [])
--> KSTREAM-MAPVALUES-0000000045
<-- KSTREAM-MAPVALUES-0000000043
Processor: KSTREAM-FILTER-0000000057 (stores: [])
--> KSTREAM-MAPVALUES-0000000058
<-- KSTREAM-MAPVALUES-0000000056
Processor: KSTREAM-FILTER-0000000010 (stores: [])
--> KSTREAM-MAPVALUES-0000000011
<-- KTABLE-TOSTREAM-0000000009
Processor: KSTREAM-FILTER-0000000019 (stores: [])
--> KSTREAM-MAPVALUES-0000000020
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000050 (stores: [])
--> KSTREAM-MAPVALUES-0000000051
<-- KSTREAM-MAPVALUES-0000000043
Processor: KSTREAM-MAPVALUES-0000000025 (stores: [])
--> KSTREAM-FILTER-0000000026
<-- KSTREAM-FILTER-0000000024
Processor: KSTREAM-MAPVALUES-0000000033 (stores: [])
--> KSTREAM-MAPVALUES-0000000034
<-- KSTREAM-FILTER-0000000032
Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
--> KSTREAM-MAPVALUES-0000000046
<-- KSTREAM-FILTER-0000000044
Processor: KSTREAM-MAPVALUES-0000000058 (stores: [])
--> KSTREAM-MAPVALUES-0000000059
<-- KSTREAM-FILTER-0000000057
Processor: KSTREAM-FILTER-0000000026 (stores: [])
--> KSTREAM-FILTER-0000000027
<-- KSTREAM-MAPVALUES-0000000025
Processor: KSTREAM-FILTER-0000000038 (stores: [])
--> KSTREAM-MAPVALUES-0000000039
<-- KSTREAM-MAPVALUES-0000000031
Processor: KSTREAM-FILTER-0000000063 (stores: [])
--> KSTREAM-MAPVALUES-0000000064
<-- KSTREAM-MAPVALUES-0000000056
Processor: KSTREAM-MAPVALUES-0000000011 (stores: [])
--> KSTREAM-FILTER-0000000012
<-- KSTREAM-FILTER-0000000010
Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
--> KSTREAM-FILTER-0000000021
<-- KSTREAM-FILTER-0000000019
Processor: KSTREAM-MAPVALUES-0000000034 (stores: [])
--> KSTREAM-FILTER-0000000035
<-- KSTREAM-MAPVALUES-0000000033
Processor: KSTREAM-MAPVALUES-0000000046 (stores: [])
--> KSTREAM-FILTER-0000000047
<-- KSTREAM-MAPVALUES-0000000045
Processor: KSTREAM-MAPVALUES-0000000051 (stores: [])
--> KSTREAM-FILTER-0000000052
<-- KSTREAM-FILTER-0000000050
Processor: KSTREAM-MAPVALUES-0000000059 (stores: [])
--> KSTREAM-FILTER-0000000060
<-- KSTREAM-MAPVALUES-0000000058
Processor: KSTREAM-MAPVALUES-0000000067 (stores: [])
--> KSTREAM-FILTER-0000000068
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000012 (stores: [])
--> KSTREAM-PEEK-0000000013
<-- KSTREAM-MAPVALUES-0000000011
Processor: KSTREAM-FILTER-0000000021 (stores: [])
--> KSTREAM-PEEK-0000000022
<-- KSTREAM-MAPVALUES-0000000020
Processor: KSTREAM-FILTER-0000000027 (stores: [])
--> KSTREAM-PEEK-0000000028
<-- KSTREAM-FILTER-0000000026
Processor: KSTREAM-FILTER-0000000035 (stores: [])
--> KSTREAM-PEEK-0000000036
<-- KSTREAM-MAPVALUES-0000000034
Processor: KSTREAM-FILTER-0000000047 (stores: [])
--> KSTREAM-PEEK-0000000048
<-- KSTREAM-MAPVALUES-0000000046
Processor: KSTREAM-FILTER-0000000052 (stores: [])
--> KSTREAM-PEEK-0000000053
<-- KSTREAM-MAPVALUES-0000000051
Processor: KSTREAM-FILTER-0000000060 (stores: [])
--> KSTREAM-PEEK-0000000061
<-- KSTREAM-MAPVALUES-0000000059
Processor: KSTREAM-FILTER-0000000068 (stores: [])
--> KSTREAM-PEEK-0000000069
<-- KSTREAM-MAPVALUES-0000000067
Processor: KSTREAM-MAPVALUES-0000000039 (stores: [])
--> KSTREAM-FILTER-0000000040
<-- KSTREAM-FILTER-0000000038
Processor: KSTREAM-MAPVALUES-0000000064 (stores: [])
--> KSTREAM-TRANSFORM-0000000065
<-- KSTREAM-FILTER-0000000063
Processor: KSTREAM-FILTER-0000000040 (stores: [])
--> KSTREAM-SINK-0000000041
<-- KSTREAM-MAPVALUES-0000000039
Processor: KSTREAM-PEEK-0000000013 (stores: [])
--> KSTREAM-SINK-0000000014
<-- KSTREAM-FILTER-0000000012
Processor: KSTREAM-PEEK-0000000022 (stores: [])
--> KSTREAM-SINK-0000000023
<-- KSTREAM-FILTER-0000000021
Processor: KSTREAM-PEEK-0000000028 (stores: [])
--> KSTREAM-SINK-0000000029
<-- KSTREAM-FILTER-0000000027
Processor: KSTREAM-PEEK-0000000036 (stores: [])
--> KSTREAM-SINK-0000000037
<-- KSTREAM-FILTER-0000000035
Processor: KSTREAM-PEEK-0000000048 (stores: [])
--> KSTREAM-SINK-0000000049
<-- KSTREAM-FILTER-0000000047
Processor: KSTREAM-PEEK-0000000053 (stores: [])
--> KSTREAM-SINK-0000000054
<-- KSTREAM-FILTER-0000000052
Processor: KSTREAM-PEEK-0000000061 (stores: [])
--> KSTREAM-SINK-0000000062
<-- KSTREAM-FILTER-0000000060
Processor: KSTREAM-PEEK-0000000069 (stores: [])
--> KSTREAM-SINK-0000000070
<-- KSTREAM-FILTER-0000000068
Processor: KSTREAM-TRANSFORM-0000000065 (stores: [workertask_deduplication])
--> KSTREAM-SINK-0000000066
<-- KSTREAM-MAPVALUES-0000000064
Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
--> log-executionStream
<-- KTABLE-SOURCE-0000000001
Sink: KSTREAM-SINK-0000000014 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000013
Sink: KSTREAM-SINK-0000000023 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000022
Sink: KSTREAM-SINK-0000000029 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000028
Sink: KSTREAM-SINK-0000000037 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000036
Sink: KSTREAM-SINK-0000000041 (topic: kestra_workertaskresult)
<-- KSTREAM-FILTER-0000000040
Sink: KSTREAM-SINK-0000000049 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000048
Sink: KSTREAM-SINK-0000000054 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000053
Sink: KSTREAM-SINK-0000000062 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000061
Sink: KSTREAM-SINK-0000000066 (topic: kestra_workertask)
<-- KSTREAM-TRANSFORM-0000000065
Sink: KSTREAM-SINK-0000000070 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000069
Processor: log-executionStream (stores: [])
--> none
<-- KTABLE-TOSTREAM-0000000002
На данный момент мне было неясно, будет ли решение устойчивым для любого параллелизма, и если я смогу ударить в другой раз вне очереди (это означает, что выполнение отката в предыдущий раз, что приводит к многократному выполнению одной и той же задачи).