Можно ли внедрить зависимости в этапы конвейера Hazelcast Jet? - PullRequest
2 голосов
/ 14 июля 2020

Например, учитывая простой конвейер, такой как:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .map(mapFn)
 .writeTo(Sinks.logger());

, я бы хотел, чтобы mapFn был чем-то, требующим несериализуемой зависимости для выполнения своей работы.

I знаю, что могу это сделать:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .mapUsingService(JetSpringServiceFactories.bean("myDependencies"),
         MyDependencies::addDependencies);
 .map(mapFn)
 .writeTo(Sinks.logger());

Это обертывает строки, считываемые из источника, в другой объект, который включает зависимости, предоставляя mapFn доступ к этим зависимостям без их необходимости вводить в сам этот объект . Это будет работать, но я хочу использовать свою функцию сопоставления вне Jet, а также как часть конвейера, и в этом случае немного странно передавать зависимости вместе с сопоставляемыми элементами, а не просто инициализировать сопоставитель с зависимостями это нужно. Это также вынуждает меня бессмысленно создавать объект-оболочку для каждого элемента в моем потоке / пакете.

В документах говорится, что другой альтернативой является использование аннотации @SpringAware для Processor, но я думаю, что это означает использование Core API, о котором в документации говорится: «в основном предлагает вам множество способов сделать ошибки», поэтому я бы предпочел этого избежать.

В обычном Hazelcast IMDG все, что десериализовано, может использовать ManagedContext для инициализируется, и это, очевидно, относится и к Jet, но функции, фильтры и т. д. c. конвейера обернуты множеством слоев материала конвейера Jet, поэтому, похоже, нет никакого способа добраться до них.

Я что-то упускаю или перечислил все имеющиеся у меня варианты (кроме обращения к какой-то "глобальный" static объект зависимостей)?

Ответы [ 2 ]

2 голосов
/ 14 июля 2020

То, как вы описали, на самом деле довольно близко к тому, как это должно быть сделано. Вы можете упростить его, просто выполнив:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .mapUsingService(bean("myDependencies"), (dep, item) -> mapFn.apply(dep, item));
 .writeTo(Sinks.logger());

Это позволяет избежать создания промежуточного элемента. Как вы уже сказали, для этого требуется, чтобы функция сопоставления также принимала зависимость в качестве параметра.

Если вы хотите этого избежать, другой вариант - написать собственный ServiceFactory, который будет выполнять сопоставление, а также принимать зависимость. Таким образом, вы можете переписать свою функцию сопоставления как услугу и ввести зависимость на уровне конструктора. ядро. То же самое было сделано для класса Metrics, который также работает в контексте stati c. Здесь также есть связанная с этим проблема:

https://github.com/hazelcast/hazelcast-jet/issues/954

Если вы заинтересованы в участии, я могу дать вам несколько указателей.

1 голос
/ 14 июля 2020

Если ваш mapFn находится внутри bean-компонента, вы можете просто использовать его как свою службу:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .mapUsingService(JetSpringServiceFactories.bean(MyService.class), MyService::mapFn);
 .writeTo(Sinks.logger());

Полный автономный пример, где конвейер вызывает mapFn на MyService, который использует свою зависимость для сопоставления:

@SpringBootApplication
public class TutorialApplication implements InitializingBean {

    @Autowired
    JetInstance jetInstance;

    public static void main(String[] args) {
        SpringApplication.run(TutorialApplication.class, args);
    }

    @Override
    public void afterPropertiesSet() {

        Pipeline p = Pipeline.create();
        p.readFrom(TestSources.items("the", "quick", "brown", "fox"))
         .mapUsingService(JetSpringServiceFactories.bean(MyService.class), MyService::mapFn)
         .writeTo(Sinks.logger());

        jetInstance.newJob(p);
    }

    @Component
    public static class MyService {

        @Autowired
        MyDependency foo;

        public String mapFn(String s) {
            return foo.bar(s);
        }
    }

    @Component
    public static class MyDependency {

        public String bar(String s) {
            return "mod: " + s;
        }
    }
}
...