Обновление бокового ввода в Apache Beam / Dataflow - PullRequest
1 голос
/ 22 марта 2019

Я искал, как обновить боковой ввод в задании потока данных без перезапуска конвейера.Я нашел этот пример https://github.com/spotify/scio/issues/865, но я не могу заставить его работать локально или развернуто.Вот пример моего кода:

 final PCollectionView<Map<String, String>> versionCoreMapping = 
         pipeline.apply("Read from spanner for active versions for different cores",

         SpannerIO.read()
                    .withSpannerConfig(spannerConfig)
                    .withInstanceId(options.getInstanceId())
                    .withDatabaseId(options.getDatabaseId())
                    .withQuery("SELECT version, sys, action FROM table"))
                .apply("Window Core Version Map",
                    Window.into(FixedWindows.of(Duration.standardSeconds(5))))
                .apply("Core Version Map into Global Window",
                    Window.into(new GlobalWindows()))
                .apply("Refresh Cache for Versioning", 
                       ParDo.of(new RefreshCache("sys", "version", "action")))
                .apply("convert to version table to map", View.asMap());

RefreshCache

public class RefreshCache extends DoFn<Struct, KV<String, String>> {
private static final long serialVersionUID = 1;
   private String key;
   private String value;
   private String secondaryPrimaryKey;
   private static final Logger LOGGER = LoggerFactory.getLogger(RefreshCache.class);

public RefreshCache(String key, String value, String secondaryPrimaryKey) {
    this.key = key;
    this.value = value;
    this.secondaryPrimaryKey = secondaryPrimaryKey;
}

@ProcessElement
public void processElement(ProcessContext c) {
    try {
        LOGGER.info("Refreshing cache..." + key);
        Struct row = c.element();
        KV<String, String> returnMap;
        if(null != secondaryPrimaryKey) {
            returnMap = KV.of(row.getString(key).concat(",").concat(row.getString(secondaryPrimaryKey)), row.getString(value));
        } else {
            returnMap = KV.of(row.getString(key), row.getString(value));
        }

        c.output(returnMap);
    }catch (Exception e) {
        LOGGER.error("Exception: " + e.getMessage());
    }
  }
}

Когда я запускаю это, я не вижу журнал внутри RefreshCache, повторяющийся с 5-секундным интервалом.

Любая помощь с этим была бы великолепна

Спасибо,

Крейг

...