Я тестирую с использованием foreach сток структурированной потоковой передачи Spark.
close()
метод никогда не вызывается.Я хочу, чтобы метод close()
вызывался каждые 2 минуты.
dataset.foreach(
new ForeachWriter<Row>() {
@Override
public void process(Row row) {}
@Override
public boolean open(long partition, long epoch) {
System.out.println("Opening");
return true;
}
@Override
public void close(Throwable ex) {
System.out.println("Closing");
}
})
.start()
.awaitTermination();
, когда я ставлю .trigger(Trigger.Continuous("5 seconds")
, тогда это хорошо в первые 30 минут.затем через 20 минут все внезапно .. ввод уменьшается на 1/3.