Данные неправильно записываются или читаются в HDFS из потоковой передачи с искрой - PullRequest
0 голосов
/ 07 сентября 2018

У меня есть процесс потоковой передачи, который использует потоковую передачу данных из раздела kafka в объект JavaPairDStream, и я хочу сохранить эти данные в HDFS. Я испробовал два подхода, оба из которых работают, но также представляли для меня проблемы, и я не уверен, что моя проблема заключается в записи данных в HDFS или при чтении из HDFS.

Я запускаю все это на своей локальной машине, в Windows, в среде, используя Java.

  1. Моей первой попыткой была итерация по потоку и запись каждого rdd в hdf с использованием jPairRDD.saveAsTextFile("hdfs://localhost:9000/test/");. Я не получаю ошибок при запуске, но когда я пытаюсь прочитать файлы обратно из каталога «test», ничего не возвращается. И когда я сам захожу в каталог, чтобы попытаться просмотреть файлы, используя hdfs dfs -ls /test/, файлы отображаются как /test/_SUCCESS и /test/part-0000 или похожие.

  2. Моя вторая попытка (и текущая рабочая) использовала jPairDStream.dstream().saveAsTextFiles("hdfs://localhost:9000/test/", "txt");. Это работает, и я вижу файлы, перечисленные в каталоге «test» в HDFS, но когда я пытаюсь прочитать их обратно, данных в два раза больше, чем должно быть. Я нашел причину этого в том, что данные почему-то заключены в круглые скобки, а закрывающая скобка находится на отдельной строке в rdd, когда я читаю их из HDFS.

В примере 2 мои входные данные выглядят так

key, <message msgTime="07-9-2018 15:49:13" mountPoint="HKWS_32" msgLength="107" msgType="1115">0wBrRbAAc7iiQgAg8AAAAAAAAAAgAQEAf95g3v/8fXDoL+FAif+0ENAhcEqGww2lGzsbmjfscJgMbwA6sAEI6h8CoIbU4ikng4eYDrYAQGxf/////gDHLS3K+y6sgdkDwiTBXUK5hgj7R/aP5ggAAIA=</message>

А потом возвращенные данные из hdfs выглядят так:

(key, <message msgTime="07-9-2018 15:49:13" mountPoint="HKWS_32" msgLength="107" msgType="1115">0wBrRbAAc7iiQgAg8AAAAAAAAAAgAQEAf95g3v/8fXDoL+FAif+0ENAhcEqGww2lGzsbmjfscJgMbwA6sAEI6h8CoIbU4ikng4eYDrYAQGxf/////gDHLS3K+y6sgdkDwiTBXUK5hgj7R/aP5ggAAIA=</message>

)

Таким образом, у rdd есть дополнительная запись, содержащая ). Я понятия не имею, почему это так. Я не могу сказать, является ли это проблемой с тем, как я записываю данные в HDFS, или как я читаю их оттуда. Мой код ниже для всех моих классов. Может кто-нибудь помочь мне определить, почему это так? Мой код ниже будет содержать обе мои попытки.

Класс Spark, который использует потоковые данные из kafka и записывает их в hdfs

public class FatStreamProcessing implements Runnable, Serializable {

    private static final long serialVersionUID = 1L;

    private static Logger logger = Logger.getLogger(FatStreamProcessing.class);

    static Map<String, Object> kafkaParams = new HashMap<>();
    private static final String inTopic = "fatTopicIn";
    private static final Streamery streamery = new Streamery();

    @Override
    public void run() {

        //Set logging level for console
        Logger.getLogger("org").setLevel(Level.ERROR);

        //Set spark context to use all cores and batch interval of 1 second and the job name
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SampleSparkKafkaStreamApp");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        setupConsumerProperties();

        //Topic the process listens to
        Collection<String> topics = Arrays.asList(inTopic);

        //Create DStream that subscribes to the list of Kafka topics
        final JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        logger.info("Direct Stream created for Fat Stream Consumer");

        //Map kafka input to key value tuple
        JavaPairDStream<String, String> jPairDStream =  stream.mapToPair(
                new PairFunction<ConsumerRecord<String, String>, String, String>() {
                    @Override
                    public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                        return new Tuple2<>(record.key(), record.value());
                    }
                });

        //Save files to hdfs
        //This is the first attempt (see streamery class for implementation)
        streamery.saveToHDFS(jPairDStream);

        //This is the second attempt  
       jPairDStream.dstream().saveAsTextFiles("hdfs://localhost:9000/test/", "txt");

        try {
            jssc.start();
            jssc.awaitTermination();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * Configure kafka consumer parameters
     * Example taken from KafkaConsumer documentation
     */
    private static void setupConsumerProperties() {
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("group.id", "test");
        kafkaParams.put("enable.auto.commit", "true");
        kafkaParams.put("auto.commit.interval.ms", "1000");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("key.serializer", StringSerializer.class);
        kafkaParams.put("value.serializer", StringSerializer.class);
    }
}

Игнорируйте это имя класса, оно изначально предназначалось для потоковой передачи данных большему количеству потребителей kafka. Теперь он просто сохраняет данные в формате hdfs

Класс, который пытается сохранить файлы в HDFS для 1-й попытки.

public class Streamery {

    private static Logger logger = Logger.getLogger(Streamery.class);

    private static Map<String, Object> kafkaParams = new HashMap<>();
    private static int ack_sum = 0;
    private static int brs_sum = 0;
    static int totalInHDFS = 0;

    public Streamery() {

    }

    public void saveToHDFS(JavaPairDStream<String, String> jPairDStream) {
        jPairDStream.foreachRDD(jPairRDD -> {
            jPairRDD.saveAsTextFile("hdfs://localhost:9000/test/");
            totalInHDFS += jPairRDD.count();
            logger.info("Saved " + totalInHDFS + " files to hdfs");
        });

    }

    /*
    public void saveToHDFS2(JavaPairDStream<String, String> javaPairDStream) {
        javaPairDStream.foreachRDD(jPairRDD -> {
            jPairRDD.foreach(rdd -> {
                totalInHDFS++;
               System.out.println("RDD conatins: " + rdd._2);
               System.out.println("Total saved to HDFS: " + totalInHDFS);
            });
        });
    }*/
}

Класс для чтения данных из HDFS

public class ReadHDFS {

    private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-M-yyyy HH:mm:ss");
    private static final SimpleDateFormat FILE_FORMAT = new SimpleDateFormat("yyyyMdd_HHmmss");

    static int count = 0;

    public static void main(String[] args) throws IOException {

        //FileWriter fileWriter = new FileWriter("hdfsMessages" + FILE_FORMAT.format(new Date()) + ".xml");

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("TempReadHDFS");

        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> textFile = sc.textFile("hdfs://localhost:9000/test/*");


        List<String> temp = textFile.collect();

        temp.forEach(s -> System.out.println(s));

        System.out.println("Total number of files in HDFS: " + textFile.count());

    }
}
...