Искра Оптимизация производительности Структурированная потоковая Kafka - PullRequest
1 голос
/ 13 апреля 2020

два сервера (4core + 16GRAM) для тестового приложения

Моя идея - получить данные из kafka, обработать с многопоточностью и сохранить в Elasticsearch

spark-submit --class com.yizhisec.bigdata.TrafficEs 
--master yarn 
--deploy-mode cluster 
--executor-memory 512M 
--executor-cores 2 
--conf spark.streaming.concurrentJobs=5 
--num-executors 5 
--supervise bigdata-1.0.jar

enter image description here но только одно задание у исполнителей enter image description here

код

Я использую numPartitions для получения данных

spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", prop.getProperty("kafka.broker.list"))
                .option("kafka.ssl.truststore.location", prop.getProperty("kafka.jks.path"))
                .option("kafka.ssl.truststore.password", prop.getProperty("kafka.jks.passwd"))
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("startingOffsets", "earliest")
                .option("numPartitions", prop.getProperty("kafka.partition"))
                .option("subscribe", topic)
                .load()
                .selectExpr("CAST(topic AS STRING)", "CAST(value AS STRING)");

код процесса

Dataset<Traffic> df = StreamSparkUtils.steamToTraffic(rawDf);
String[] appProtoFilter = properties.getProperty("appproto").split(",");

Dataset<TrafficNode> nodeDataset = df
        .filter(df.col("appproto").isin(appProtoFilter))
        .map(new MapFunction<Traffic, TrafficNode>() {
            @Override
            public TrafficNode call(Traffic traffic) throws Exception {
                TrafficNode n = new TrafficNode();
                n.setDestport(traffic.getDestport());
                n.setSrcip(traffic.getSrcip());
                n.setDestip(traffic.getDestip());
                n.setAppproto(traffic.getAppproto());
                n.setEndtime(traffic.getEnd_time());
                return n;
            }
        }, Encoders.bean(TrafficNode.class));




StreamingQuery query = null;
try {
    query = StreamSparkUtils.streamSinkEs(nodeDataset, "loh_traffic");
    query.awaitTermination();
} catch (IOException | StreamingQueryException e) {
    e.printStackTrace();
}

Как его оптимизировать.

сохранить в es

public static StreamingQuery streamSinkEs(Dataset<?> dataSet, String index) throws IOException {
        Properties properties = readProp();
        return dataSet.writeStream()
                .option("es.nodes", properties.getProperty("es.nodes"))
                .option("es.port", properties.getProperty("es.port"))
                .option("checkpointLocation", properties.getProperty("es.checkpoint"))
                .format("es")
                .start(index);
    }

streamtoTraffi c

    public static Dataset<Traffic> steamToTraffic(Dataset<Row> df) {
        if (df == null) {
            return null;
        }

        StructType trafficSchema = new StructType()
                .add("guid", DataTypes.LongType)
                ...
                .add("downsize", DataTypes.LongType);

        Dataset<Row> ds = df.select(functions.from_json(df.col("value").cast(DataTypes.StringType), trafficSchema).as("data")).select("data.*");
        return ds.as(ExpressionEncoder.javaBean(Traffic.class));
    }

1 Ответ

0 голосов
/ 22 апреля 2020

Приложения, которые читают из Kafka, масштабируются с количеством разделов в топи c. Пока ваши данные хранятся только в одном разделе, ваше приложение может использовать только одного единственного потребителя, чтобы получить data.

В вашем случае не имеет значения, сколько исполнителей у вас есть в вашем приложении Spark, поскольку только один может читать из одного раздела. Это потому, что в Кафке у нас есть понятия Consumer Group . Если вы хотите улучшить производительность, вам следует увеличить количество разделов в вашей топике Kafka c.

Вот выдержка из документации Kafa , описывающей взаимодействие между группой потребителей и разделами:

Обладая понятием параллелизма - разделения - по темам, Kafka может обеспечить как гарантии упорядочения, так и балансировку нагрузки в пуле пользовательских процессов. Это достигается путем назначения разделов в topi c потребителям в группе потребителей, чтобы каждый раздел потреблялся ровно одним потребителем в группе. Делая это, мы гарантируем, что потребитель является единственным читателем этого раздела и использует данные по порядку. Поскольку существует много разделов, это по-прежнему балансирует нагрузку на множество пользовательских экземпляров. Однако обратите внимание, что в группе потребителей не может быть больше экземпляров потребителей, чем разделов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...