Я хочу обновлять широковещательную переменную каждую минуту. Поэтому я использую пример кода, который вы дали Аасте в этом вопросе.
как я могу обновить широковещательную переменную в потоковой передаче Spark?
Но это не сработало. Функция updateAndGet()
работает только при запуске потокового приложения. Когда я отлаживал свой код, он не входил в fuction updateAndGet()
дважды. Таким образом, переменная широковещания не обновлялась каждую минуту.
Почему?
Вот мой пример кода.
public class BroadcastWrapper {
private Broadcast<List<String>> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<List<String>> updateAndGet(JavaStreamingContext jsc) {
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (broadcastVar == null || diff > 60000) { // Lets say we want to refresh every 1 min =
// 60000 ms
if (broadcastVar != null)
broadcastVar.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
// Your logic to refreshs
// List<String> data = getRefData();
List<String> data = new ArrayList<String>();
data.add("tang");
data.add("xiao");
data.add(String.valueOf(System.currentTimeMillis()));
broadcastVar = jsc.sparkContext().broadcast(data);
}
return broadcastVar;}}
//Here is the computing code submit to spark streaming.
lines.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
Broadcast<List<String>> blacklist =
BroadcastWrapper.getInstance().updateAndGet(jsc);
@Override
public JavaRDD<String> call(JavaRDD<String> rdd) {
JavaRDD<String> dd=rdd.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String word) {
if (blacklist.getValue().contains(word)) {
return false;
} else {
return true;
}
}
});
return dd;
}});