Есть ли способ интегрировать Spark-cassandra с SpringBoot 2? - PullRequest
0 голосов
/ 20 февраля 2019

Я использую JHipster для генерации приложения.Я пытаюсь интегрировать искру в весеннюю загрузку.Но как-то это не работает.Я новичок в весенних сапогах и свечах.Я не получаю никаких исключений или ошибок, но не получаю вывод тоже.Это работает хорошо, если я использовал Java-Spark-Cassandra.Может кто-нибудь сказать мне, что не так с моим кодом?

//SparkService.java

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class SparkService{

    @Autowired
    private JavaSparkContext javaSparkContext;

    @Autowired
    private SparkSession sparkSession;

    @Value("${spring.data.cassandra.keyspace-name}")
    private String CassandraKeyspace;

    @Value("${cassandra.table}")
    private String CassandraTable;

        public void getAllOrders() {
            try{
            Map<String, String> options = new HashMap<String, String>();
            options.put("keyspace", CassandraKeyspace);
            options.put("table", CassandraTable);

            sparkSession
                .read()
                .format("org.apache.spark.sql.cassandra")
                .options(options)
                .load()
                .createOrReplaceTempView(CassandraTable);
            
            Dataset<Row> df = sparkSession.sql("select * from instruments");
            df.show();

        }
        catch(Exception ex){
        ex.printStackTrace();
    }
    }


}
//SparkServiceImpl.java

package com.celeritio.sparkgateway.spark;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class SparkServiceImpl {
    @Autowired
    SparkService sparkService;

    @EventListener(ContextRefreshedEvent.class)
    public void myMain(){
        System.out.println("myMain");
        sparkService.getAllOrders();
    }
}
//SparkConfiguration.java

@Configuration
@PropertySource("classpath:sparkconfig.properties")

public class SparkConfiguration {

    private static Logger log = LoggerFactory.getLogger(SparkConfiguration.class.getName());

    @Value("${spark.master}")
    private String sparkMaster;

    @Value("${spring.data.cassandra.keyspace-name}")
    private String cassandraKeyspace;

    @Value("${cassandra.table}")
    private String cassandraTable;

    @Value("${spring.data.cassandra.contact-points}")
    private String cassandraHost;

    @Value("${spring.data.cassandra.port}")
    private String cassandraPort;

    @Bean
    public SparkConf sparkConf() {
        SparkConf conf = new SparkConf(true)
            .set("spark.cassandra.connection.host",cassandraHost)
            .set("spark.cassandra.connection.port", cassandraPort)
            .setMaster(sparkMaster)
            .setAppName("SparkConfiguration");
        System.out.println("SparkConf"+conf.isTraceEnabled());
        return conf;
    }

    @Bean
    public JavaSparkContext javaSparkContext() {
        log.info("Connecting to spark with master Url: {}, and cassandra host: {}",
            sparkMaster, cassandraHost);

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf());

        log.debug("spark context created");

        return javaSparkContext;
    }


    @Bean
    public SparkSession sparkSession() {
        return SparkSession
            .builder()
            .config(sparkConf())
            .sparkContext(javaSparkContext().sc())
            .appName("SparkConfiguration")
            .getOrCreate();

    }

}

1 Ответ

0 голосов
/ 21 февраля 2019

Я думаю, что проблема только в каркасе ведения журналов, Spark использует log4j, а Spring использует logback.

Я помню, что нам нужно было удалить зависимость logback от Spring при интеграции с Spark.

...