Динамически создавать функцию плоской карты (ключевые состояния) со значениями в потоке - PullRequest
0 голосов
/ 07 мая 2019

Я пишу потоковую программу Flink для извлечения функций для нашей обученной модели в автономном режиме, и мне было интересно узнать о дизайне программы. Я хочу, чтобы каждая логика извлечения объектов поддерживала свое собственное состояние в своем классе, чтобы добавление извлечения новой функции было бы эквивалентно добавлению нового класса.

Грубый дизайн высокого уровня выглядит следующим образом:

#data is the stream of relative paths to the feature extraction logic in our code e.g. com.xxx.FeatureExtraction1
val data:DataStream[String] = ...

#based on the relative path, use reflection to initiate the class
featureExtraction1 = method.getReflect("com.xxx.FeatureExtraction1")
data.keyBy(_).flatmap(featureExtraction1)

, где каждая логика извлечения объектов имеет собственное внутреннее отслеживание состояния

class FeatureExtraction1 extends RichFlatMapFunction[String, Double)] {

private var mystate: MapState = _

override def flatMap(input: String, out: Collector[Double]) = {
// access the state value
}

override def open(parameters: Configuration): Unit = {
   mystate = xxx
}

}

Я мог бы заставить это работать так, как только я добавлю новый класс извлечения объектов, например com.xxx.FeatureExtraction2, я добавляю это к потоку данных как

data.keyBy(_).flatmap(featureExtraction1).flatmap(featureExtraction2)...flatmap(featureExtractionN)

Тем не менее, я не знаю Флинка достаточно хорошо, чтобы быть уверенным, что если featureExtraction1, хотя featureExtractionN, будет выполняться одновременно (они должны быть в моей голове), если они будут связаны таким образом. Во-вторых, я хочу написать код, который автоматически создает новую логику извлечения функций, и я не добавляю ее в поток. В моей голове это может выглядеть так:

data.keyBy(_).foreachValueIntheStream.flatmap(new FeatureExtractionX based on the Value)

если бы я мог это сделать, добавление новой функции означало бы добавление нового класса извлечения объектов с собственным отслеживанием состояния

Пожалуйста, посоветуйте мое наивное мышление. Я благодарен за любое руководство.

1 Ответ

2 голосов
/ 07 мая 2019

Flink не может динамически добавлять функции. Но вы могли бы сделать что-то близкое, я думаю.

Я бы использовал широковещательный поток для путей к функциям и обычный поток для фактических данных, которые должны быть обработаны. Соедините их, чтобы создать связанный поток, затем запустите его в CoFlatMapFunction. Внутри этой функции вы будете вести список (динамически генерируемых) функций извлечения объектов, которые вы применяете к входящим данным. Для состояния используйте Map<feature extraction function id, value>, чтобы каждая функция извлечения объектов записывала свое состояние на одной и той же карте.

У вас есть типичная проблема - желание очистить широковещательный поток перед обработкой первого из элементов данных - см. Список рассылки для обсуждения того, как это сделать.

...