Intro
Я использую Apache Flink для построения довольно сложной сети потоков данных.Идея состоит в том, чтобы реализовать механизм правил с flink.
В качестве базового описания приложения, оно должно работать так:
Данные полученыисточником-потребителем kafka и обрабатывается несколькими потоками данных, пока, наконец, не будет отправлено в приемник производителя kafka.Входящие данные содержат объекты с логическим ключом («идентификатор объекта»), и входящие сообщения могут ссылаться на один и тот же идентификатор объекта.Для каждого данного идентификатора объекта порядок входящих сообщений должен быть сохранен во всем приложении.Порядок общих сообщений может быть произвольным.
Это означает, что сообщения a , b и c объекта1 должны обрабатываться в порядкеОднако сообщение x объекта 2 может быть обработано между a1 / b1 / c1, до или после, это не имеет значения.
Для моего текущего понимания это означает, что я должен keyBy(_.objectID)
, чтобы сообщения одного и того же объекта обрабатывались в порядке поступления.
Текущий подход
Для реализации фактического механизма правил , сеть потоков создано.Идея заключается в следующем:
- каждое правило будет иметь 1- n условия
- для каждого условия каждого правила создать подпоток исходного потокас
.filter(_.matches(rule.condition))
- объединить все подпотоки, которые соответствуют одному и тому же правилу, с помощью
substream1.connect(substream2).flatMap(new CombineFunction[MyObject](...))
connect
можно объединить только 2 потока, поэтому правило с 3 условиями приведет кпоследующие 2 объединения - правила, использующие то же условие, будут повторно использовать тот же подпоток, созданный на втором шаге.
Это приведет к присоединению n потоки, где n соответствует количеству правил.К присоединенным потокам будет добавлена функция map
, которая помечает сообщение, так что мы знаем, что правило соответствует.
Каждый объединенный поток / поток результатов может публиковать свой результат («правило соответствует xyz»)к производителю кафки независимо от других результатов, поэтому на данный момент я могу присоединить приемник к потокам.
Детали подключения
Поскольку .connect
двух потоков («условие» -Подпотоки) должны только передать сообщение, если оно было получено в обоих потоках (^ = оба условия совпадают), мне нужно RichCoFlatMapFunction
с состоянием с ключом, которое может позаботиться о «проходе, только если оно было получено уже вдругая сторона ".
Однако , проблема в том, что поток имеет ключ object-id .Так что же происходит, если через сеть проходят 2 сообщения одного и того же объекта и достигают .connect().map(new RichCoFlatMapFunction...)
?Это приведет к неправильному выводу.Мне нужно было бы назначить каждому входящему сообщению уникальный идентификатор (UUID) при входе в сеть, чтобы я мог использовать этот ключ (вместо идентификатора объекта) в соединении .connect().map()..
.Но в то же время мне нужно, чтобы поток был идентифицирован по идентификатору объекта, чтобы сообщения одного и того же объекта обрабатывались по порядку.Что делать?
Чтобы решить эту проблему, я сохранил поток ввода с keyBy(_.objectID)
, но RichCoFlatMapFunction
в объединении потоков больше не использует состояние ключа.Вместо этого я использую простое операторное состояние, которое сохраняет карту переданных объектов, но реализует ту же логику, только с ручным поиском ключа / значения.
Этот , кажется, работает, однакоЯ не знаю, вызывает ли это больше проблем.
Визуализация
Графический интерфейс Flink отобразит это изображение для списка из 14 правил с 23 условиями (в некоторых есть только одно правило).условие):
Код
Создание сети осуществляется с помощью этого кода:
val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]()
val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]()
if (rules.isEmpty)
return
// create partial streams for all conditions (first level)
// cache the sub-stream with the hashcode of its condition as key (for re-use)
for (rule <- rules if rule.checks.nonEmpty ;
cond <- rule.checks if !streamCache.contains(cond.hashCode()))
streamCache += cond.hashCode -> sourceStream.filter(cond.matches _)
// create joined streams for combined conditions (sub-levels)
for (rule <- rules if rule.checks.nonEmpty)
{
val ruleName = rule.ruleID
// for each rule, starting with the rule with the least conditions ...
if (rule.checks.size == 1)
{
// ... create exit node if single-condition rule
// each exit node applies the rule-name to the objects set of matched rules.
outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule = ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj })
}
else
{
// ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate paths)
var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode)
var idString = rule.checks.head.idString
for (i <- rule.checks.indices)
{
if (i == rule.checks.size-1)
{
// reached last condition of rule, create exit-node
// each exit node applies the rule-name to the objects set of matched rules.
val rn = ruleName
val objectType = rule.objectType.mkString(":")
val statement = rule.statement
outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn, objectType, statement)) ; obj })
}
else
{
// intermediate condition, create normal intermediate node
val there = rule.checks(i+1)
val connectStream = streamCache(there.hashCode)
idString += (":" + there.idString)
// try to re-use existing tree-segments
if (streamCache.contains(idString.hashCode))
sourceStream = streamCache(idString.hashCode)
else
sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString))
}
}
}
}
// connect each output-node to the sink
for (stream <- outputNodesCache)
{
stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink)
}
StatefulCombineFunction
, использованный в предыдущем фрагменте:
class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory, WorkingMemory] with CheckpointedFunction
{
@transient
private var leftState:ListState[(String, WorkingMemory)] = _
private var rightState:ListState[(String, WorkingMemory)] = _
private var bufferedLeft = ListBuffer[(String, WorkingMemory)]()
private var bufferedRight = ListBuffer[(String, WorkingMemory)]()
override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft, bufferedRight, xmlObject, out, "left")
override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight, bufferedLeft, xmlObject, out, "right")
def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String, WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit =
{
val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid)
if (otherIdx > -1)
{
out.collect(leftState(otherIdx)._2)
leftState.remove(otherIdx)
}
else
{
rightState += ((xmlObject.uuid, xmlObject))
}
}
override def initializeState(context:FunctionInitializationContext): Unit = ???
override def snapshotState(context:FunctionSnapshotContext):Unit = ???
}
Я знаю, что очистка частичных совпадений из состояния оператора отсутствует (время жизни), но это не важно для текущего состояния разработки и будет добавлено позже.
Справочная информация
В этом приложении должен быть реализован алгоритм rete для сопоставления правил с использованием flink (https://en.wikipedia.org/wiki/Rete_algorithm).
. Другой подход заключается в том, чтобы просто зациклить все правила для каждого входящего сообщения, иприкрепить результат. У меня есть рабочая реализация для этого подхода с использованием flink, поэтому, пожалуйста, не советуйте это как решение.
Проблемы
Проблема в том, что приложение испортило порядоквходящих сообщений на уровне идентификатора объекта. То есть он не достигает того, что мне требовалось во вступлении. Для каждого идентификатора объекта входящие сообщения должны сохранять порядок. Но это не так.
Я не знаю, в какой момент кода нарушен порядок, или как эти операции распределяются между потоками, поэтому я не знаю, как решить эту проблему.