Я искал, как обновить боковой ввод в задании потока данных без перезапуска конвейера.Я нашел этот пример https://github.com/spotify/scio/issues/865, но я не могу заставить его работать локально или развернуто.Вот пример моего кода:
final PCollectionView<Map<String, String>> versionCoreMapping =
pipeline.apply("Read from spanner for active versions for different cores",
SpannerIO.read()
.withSpannerConfig(spannerConfig)
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseId())
.withQuery("SELECT version, sys, action FROM table"))
.apply("Window Core Version Map",
Window.into(FixedWindows.of(Duration.standardSeconds(5))))
.apply("Core Version Map into Global Window",
Window.into(new GlobalWindows()))
.apply("Refresh Cache for Versioning",
ParDo.of(new RefreshCache("sys", "version", "action")))
.apply("convert to version table to map", View.asMap());
RefreshCache
public class RefreshCache extends DoFn<Struct, KV<String, String>> {
private static final long serialVersionUID = 1;
private String key;
private String value;
private String secondaryPrimaryKey;
private static final Logger LOGGER = LoggerFactory.getLogger(RefreshCache.class);
public RefreshCache(String key, String value, String secondaryPrimaryKey) {
this.key = key;
this.value = value;
this.secondaryPrimaryKey = secondaryPrimaryKey;
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
LOGGER.info("Refreshing cache..." + key);
Struct row = c.element();
KV<String, String> returnMap;
if(null != secondaryPrimaryKey) {
returnMap = KV.of(row.getString(key).concat(",").concat(row.getString(secondaryPrimaryKey)), row.getString(value));
} else {
returnMap = KV.of(row.getString(key), row.getString(value));
}
c.output(returnMap);
}catch (Exception e) {
LOGGER.error("Exception: " + e.getMessage());
}
}
}
Когда я запускаю это, я не вижу журнал внутри RefreshCache, повторяющийся с 5-секундным интервалом.
Любая помощь с этим была бы великолепна
Спасибо,
Крейг