У меня есть процесс потоковой передачи, который использует потоковую передачу данных из раздела kafka в объект JavaPairDStream
, и я хочу сохранить эти данные в HDFS. Я испробовал два подхода, оба из которых работают, но также представляли для меня проблемы, и я не уверен, что моя проблема заключается в записи данных в HDFS или при чтении из HDFS.
Я запускаю все это на своей локальной машине, в Windows, в среде, используя Java.
Моей первой попыткой была итерация по потоку и запись каждого rdd в hdf с использованием jPairRDD.saveAsTextFile("hdfs://localhost:9000/test/");
. Я не получаю ошибок при запуске, но когда я пытаюсь прочитать файлы обратно из каталога «test», ничего не возвращается. И когда я сам захожу в каталог, чтобы попытаться просмотреть файлы, используя hdfs dfs -ls /test/
, файлы отображаются как /test/_SUCCESS
и /test/part-0000
или похожие.
Моя вторая попытка (и текущая рабочая) использовала jPairDStream.dstream().saveAsTextFiles("hdfs://localhost:9000/test/", "txt");
. Это работает, и я вижу файлы, перечисленные в каталоге «test» в HDFS, но когда я пытаюсь прочитать их обратно, данных в два раза больше, чем должно быть.
Я нашел причину этого в том, что данные почему-то заключены в круглые скобки, а закрывающая скобка находится на отдельной строке в rdd, когда я читаю их из HDFS.
В примере 2 мои входные данные выглядят так
key, <message msgTime="07-9-2018 15:49:13" mountPoint="HKWS_32" msgLength="107" msgType="1115">0wBrRbAAc7iiQgAg8AAAAAAAAAAgAQEAf95g3v/8fXDoL+FAif+0ENAhcEqGww2lGzsbmjfscJgMbwA6sAEI6h8CoIbU4ikng4eYDrYAQGxf/////gDHLS3K+y6sgdkDwiTBXUK5hgj7R/aP5ggAAIA=</message>
А потом возвращенные данные из hdfs выглядят так:
(key, <message msgTime="07-9-2018 15:49:13" mountPoint="HKWS_32" msgLength="107" msgType="1115">0wBrRbAAc7iiQgAg8AAAAAAAAAAgAQEAf95g3v/8fXDoL+FAif+0ENAhcEqGww2lGzsbmjfscJgMbwA6sAEI6h8CoIbU4ikng4eYDrYAQGxf/////gDHLS3K+y6sgdkDwiTBXUK5hgj7R/aP5ggAAIA=</message>
)
Таким образом, у rdd есть дополнительная запись, содержащая )
. Я понятия не имею, почему это так. Я не могу сказать, является ли это проблемой с тем, как я записываю данные в HDFS, или как я читаю их оттуда. Мой код ниже для всех моих классов. Может кто-нибудь помочь мне определить, почему это так? Мой код ниже будет содержать обе мои попытки.
Класс Spark, который использует потоковые данные из kafka и записывает их в hdfs
public class FatStreamProcessing implements Runnable, Serializable {
private static final long serialVersionUID = 1L;
private static Logger logger = Logger.getLogger(FatStreamProcessing.class);
static Map<String, Object> kafkaParams = new HashMap<>();
private static final String inTopic = "fatTopicIn";
private static final Streamery streamery = new Streamery();
@Override
public void run() {
//Set logging level for console
Logger.getLogger("org").setLevel(Level.ERROR);
//Set spark context to use all cores and batch interval of 1 second and the job name
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SampleSparkKafkaStreamApp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
setupConsumerProperties();
//Topic the process listens to
Collection<String> topics = Arrays.asList(inTopic);
//Create DStream that subscribes to the list of Kafka topics
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
logger.info("Direct Stream created for Fat Stream Consumer");
//Map kafka input to key value tuple
JavaPairDStream<String, String> jPairDStream = stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>(record.key(), record.value());
}
});
//Save files to hdfs
//This is the first attempt (see streamery class for implementation)
streamery.saveToHDFS(jPairDStream);
//This is the second attempt
jPairDStream.dstream().saveAsTextFiles("hdfs://localhost:9000/test/", "txt");
try {
jssc.start();
jssc.awaitTermination();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Configure kafka consumer parameters
* Example taken from KafkaConsumer documentation
*/
private static void setupConsumerProperties() {
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("group.id", "test");
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("key.serializer", StringSerializer.class);
kafkaParams.put("value.serializer", StringSerializer.class);
}
}
Игнорируйте это имя класса, оно изначально предназначалось для потоковой передачи данных большему количеству потребителей kafka. Теперь он просто сохраняет данные в формате hdfs
Класс, который пытается сохранить файлы в HDFS для 1-й попытки.
public class Streamery {
private static Logger logger = Logger.getLogger(Streamery.class);
private static Map<String, Object> kafkaParams = new HashMap<>();
private static int ack_sum = 0;
private static int brs_sum = 0;
static int totalInHDFS = 0;
public Streamery() {
}
public void saveToHDFS(JavaPairDStream<String, String> jPairDStream) {
jPairDStream.foreachRDD(jPairRDD -> {
jPairRDD.saveAsTextFile("hdfs://localhost:9000/test/");
totalInHDFS += jPairRDD.count();
logger.info("Saved " + totalInHDFS + " files to hdfs");
});
}
/*
public void saveToHDFS2(JavaPairDStream<String, String> javaPairDStream) {
javaPairDStream.foreachRDD(jPairRDD -> {
jPairRDD.foreach(rdd -> {
totalInHDFS++;
System.out.println("RDD conatins: " + rdd._2);
System.out.println("Total saved to HDFS: " + totalInHDFS);
});
});
}*/
}
Класс для чтения данных из HDFS
public class ReadHDFS {
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-M-yyyy HH:mm:ss");
private static final SimpleDateFormat FILE_FORMAT = new SimpleDateFormat("yyyyMdd_HHmmss");
static int count = 0;
public static void main(String[] args) throws IOException {
//FileWriter fileWriter = new FileWriter("hdfsMessages" + FILE_FORMAT.format(new Date()) + ".xml");
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("TempReadHDFS");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> textFile = sc.textFile("hdfs://localhost:9000/test/*");
List<String> temp = textFile.collect();
temp.forEach(s -> System.out.println(s));
System.out.println("Total number of files in HDFS: " + textFile.count());
}
}