Проблемы spark + kafka в распределенной среде - PullRequest
0 голосов
/ 26 января 2019

Spark потребляет данные в кафке.Я получил объект операции с базой данных (beCalTaskTaskDao) с аннотацией @resource.когда он был запущен в первый раз, beCalTaskTaskDao обычно был ценным, но beCalTaskTaskDao показывал ноль, когда в kakfa было сообщение, которое начало потреблять, что меня очень смутило.Ниже приведен скриншот программы.Я надеюсь, что кто-то может дать мне несколько советов.

Нет проблем при развертывании Windows на одной машине.

@Repository
public class RapidCalculateTask implements Serializable{
    private static Logger log = LoggerFactory.getLogger(RapidCalculateTask.class);
    /**
     * 
     */
    private static final long serialVersionUID = -7664074371811203339L;

    @Resource
    BeCalculateTaskDao beCalTask; 

    private String servers_kafka = "127.0.0.1:9092";
    private String subscribe = "topic";
    private static BeCalculateTaskDao beCalTaskTaskDao; 


    public void startStruStram() {
        setBeCalTaskTaskDao(beCalTask);
        try {
            //SparkConf conf = new SparkConf().setMaster("local[3]").setAppName("SparkA");
            //SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); 
            CalculateService calService = new CalculateService();

            Properties prop = calService.getProperties();
            servers_kafka = prop.getProperty("spark.servers_kf");
            subscribe = prop.getProperty("spark.subscribe"); 

            SparkConf conf = calService.getSparkConfig(prop);

            SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); 

            Dataset<Row> lines = spark.readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", servers_kafka)
                    .option("subscribe", subscribe)
                    .load().selectExpr("CAST(value AS STRING)");


            StreamingQuery query = lines.writeStream()
                    .trigger(Trigger.ProcessingTime(processingtime))
                    .foreach(new RapidForeachWriter<Row>(getBeCalTaskTaskDao()))
                    .start(); 

            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }


    public static BeCalculateTaskDao getBeCalTaskTaskDao() {
        return beCalTaskTaskDao;
    }

    public static void setBeCalTaskTaskDao(BeCalculateTaskDao beCalTaskTaskDao) {
        RapidCalculateTask.beCalTaskTaskDao = beCalTaskTaskDao;
    }


}

public class RapidForeachWriter<Rows> extends ForeachWriter<Rows> implements Serializable{
    /**
     * 
     */
    private static final long serialVersionUID = 5549303376220035143L;
    private static Logger log = LoggerFactory.getLogger(RapidForeachWriter.class);

    private static BeCalculateTaskDao beCalTask;    
    private long num;   
    private JavaSparkContext javaCont;

    public RapidForeachWriter (BeCalculateTaskDao beCalTasks){
        RapidForeachWriter.beCalTask = beCalTasks;
        num = 500L;
    }

    @Override
    public void close(Throwable errorOrNull) {
        // TODO Auto-generated method stub 
    }

    @Override
    public boolean open(long partitionId, long version) {
        log.info("check beCalTask is null"+num); 
        log.info("check beCalTask si null"+beCalTask); 
        try { 
            if(beCalTask.equals(null)) {
                return false;
            }
            return true;
        } catch (Exception e) { 
            return false;
        }
    }

    @Override
    public void process(Rows value){

    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...