Spark потребляет данные в кафке.Я получил объект операции с базой данных (beCalTaskTaskDao) с аннотацией @resource.когда он был запущен в первый раз, beCalTaskTaskDao обычно был ценным, но beCalTaskTaskDao показывал ноль, когда в kakfa было сообщение, которое начало потреблять, что меня очень смутило.Ниже приведен скриншот программы.Я надеюсь, что кто-то может дать мне несколько советов.
Нет проблем при развертывании Windows на одной машине.
@Repository
public class RapidCalculateTask implements Serializable{
private static Logger log = LoggerFactory.getLogger(RapidCalculateTask.class);
/**
*
*/
private static final long serialVersionUID = -7664074371811203339L;
@Resource
BeCalculateTaskDao beCalTask;
private String servers_kafka = "127.0.0.1:9092";
private String subscribe = "topic";
private static BeCalculateTaskDao beCalTaskTaskDao;
public void startStruStram() {
setBeCalTaskTaskDao(beCalTask);
try {
//SparkConf conf = new SparkConf().setMaster("local[3]").setAppName("SparkA");
//SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
CalculateService calService = new CalculateService();
Properties prop = calService.getProperties();
servers_kafka = prop.getProperty("spark.servers_kf");
subscribe = prop.getProperty("spark.subscribe");
SparkConf conf = calService.getSparkConfig(prop);
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> lines = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", servers_kafka)
.option("subscribe", subscribe)
.load().selectExpr("CAST(value AS STRING)");
StreamingQuery query = lines.writeStream()
.trigger(Trigger.ProcessingTime(processingtime))
.foreach(new RapidForeachWriter<Row>(getBeCalTaskTaskDao()))
.start();
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
}
public static BeCalculateTaskDao getBeCalTaskTaskDao() {
return beCalTaskTaskDao;
}
public static void setBeCalTaskTaskDao(BeCalculateTaskDao beCalTaskTaskDao) {
RapidCalculateTask.beCalTaskTaskDao = beCalTaskTaskDao;
}
}
public class RapidForeachWriter<Rows> extends ForeachWriter<Rows> implements Serializable{
/**
*
*/
private static final long serialVersionUID = 5549303376220035143L;
private static Logger log = LoggerFactory.getLogger(RapidForeachWriter.class);
private static BeCalculateTaskDao beCalTask;
private long num;
private JavaSparkContext javaCont;
public RapidForeachWriter (BeCalculateTaskDao beCalTasks){
RapidForeachWriter.beCalTask = beCalTasks;
num = 500L;
}
@Override
public void close(Throwable errorOrNull) {
// TODO Auto-generated method stub
}
@Override
public boolean open(long partitionId, long version) {
log.info("check beCalTask is null"+num);
log.info("check beCalTask si null"+beCalTask);
try {
if(beCalTask.equals(null)) {
return false;
}
return true;
} catch (Exception e) {
return false;
}
}
@Override
public void process(Rows value){
}
}