Объем памяти исполнителя постоянно растет при использовании Apache Beam 2.4.0 с Spark Streaming 2.3.1. Что просходит? - PullRequest
0 голосов
/ 10 июня 2019

Я использую Apache Beam 2.4.0 с Spark Streaming 2.3.1, чтобы просто прочитать тему kafka и написать другую тему kafka. Я считаю, что объем памяти Spark executor продолжает расти в веб-интерфейсе Spark. Кто-нибудь сталкивался с этим явлением? Что вызвало это?

public class BeamKafkaSparkDemo {
    public static void main(String[] args) {

        Map<String, String> runnerParametersSpark = new HashMap<>();
        runnerParametersSpark.put("storageLevel", "MEMORY_ONLY_SER");
        runnerParametersSpark.put("streaming", "true");
        runnerParametersSpark.put("batchIntervalMillis", "2000");
        runnerParametersSpark.put("maxRecordsPerBatch", "5000");
        System.setProperty("HADOOP_USER_NAME", "root");
        runnerParametersSpark.put("checkpointDir", "hdfs://10.1.240.110:19000/uyun/udap/example00000000010/");
        runnerParametersSpark.put("checkpointDurationMillis", "2000");
        runnerParametersSpark.put("runner", "SparkRunner");
        runnerParametersSpark.put("enableSparkMetricSinks", "false");

        int i = 0;
        String[] newArgs = new String[runnerParametersSpark.keySet().size()];
        for (Map.Entry<String, String> entry : runnerParametersSpark.entrySet()) {
            newArgs[i] = "--" + entry.getKey() + "=" + entry.getValue();
            i++;
        }
        PipelineOptions options = PipelineOptionsFactory.fromArgs(newArgs).create();

        Pipeline pipeline = Pipeline.create(options);
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("group.id", "consumerA000000001010");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        PCollection<String> lines = //这里kV后说明kafka中的key和value均为String类型
                pipeline.apply(KafkaIO.<String, String>read()
                                .withBootstrapServers("10.1.61.141:9292,10.1.61.143:9292,10.1.61.144:9292")//必需,设置kafka的服务器地址和端口
                                .withTopic("metrics")//必需,设置要读取的kafka的topic名称
                                .withKeyDeserializer(StringDeserializer.class)//必需
                                .withValueDeserializer(StringDeserializer.class)//必需
                                .updateConsumerProperties(consumerProps)
//                        .withReadCommitted()
                                .commitOffsetsInFinalize()
                                .withoutMetadata()
                ).apply(Values.<String>create());

        lines.apply(KafkaIO.<KV<String, String>, String>write()
                .withBootstrapServers("10.1.61.141:9292,10.1.61.143:9292,10.1.61.144:9292")
                .withTopic("results")
                .withValueSerializer(StringSerializer.class) // just need serializer for value
                .values()
        );

        pipeline.run().waitUntilFinish();

    }
}

...