два сервера (4core + 16GRAM) для тестового приложения
Моя идея - получить данные из kafka, обработать с многопоточностью и сохранить в Elasticsearch
spark-submit --class com.yizhisec.bigdata.TrafficEs
--master yarn
--deploy-mode cluster
--executor-memory 512M
--executor-cores 2
--conf spark.streaming.concurrentJobs=5
--num-executors 5
--supervise bigdata-1.0.jar
но только одно задание у исполнителей
код
Я использую numPartitions
для получения данных
spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", prop.getProperty("kafka.broker.list"))
.option("kafka.ssl.truststore.location", prop.getProperty("kafka.jks.path"))
.option("kafka.ssl.truststore.password", prop.getProperty("kafka.jks.passwd"))
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("startingOffsets", "earliest")
.option("numPartitions", prop.getProperty("kafka.partition"))
.option("subscribe", topic)
.load()
.selectExpr("CAST(topic AS STRING)", "CAST(value AS STRING)");
код процесса
Dataset<Traffic> df = StreamSparkUtils.steamToTraffic(rawDf);
String[] appProtoFilter = properties.getProperty("appproto").split(",");
Dataset<TrafficNode> nodeDataset = df
.filter(df.col("appproto").isin(appProtoFilter))
.map(new MapFunction<Traffic, TrafficNode>() {
@Override
public TrafficNode call(Traffic traffic) throws Exception {
TrafficNode n = new TrafficNode();
n.setDestport(traffic.getDestport());
n.setSrcip(traffic.getSrcip());
n.setDestip(traffic.getDestip());
n.setAppproto(traffic.getAppproto());
n.setEndtime(traffic.getEnd_time());
return n;
}
}, Encoders.bean(TrafficNode.class));
StreamingQuery query = null;
try {
query = StreamSparkUtils.streamSinkEs(nodeDataset, "loh_traffic");
query.awaitTermination();
} catch (IOException | StreamingQueryException e) {
e.printStackTrace();
}
Как его оптимизировать.
сохранить в es
public static StreamingQuery streamSinkEs(Dataset<?> dataSet, String index) throws IOException {
Properties properties = readProp();
return dataSet.writeStream()
.option("es.nodes", properties.getProperty("es.nodes"))
.option("es.port", properties.getProperty("es.port"))
.option("checkpointLocation", properties.getProperty("es.checkpoint"))
.format("es")
.start(index);
}
streamtoTraffi c
public static Dataset<Traffic> steamToTraffic(Dataset<Row> df) {
if (df == null) {
return null;
}
StructType trafficSchema = new StructType()
.add("guid", DataTypes.LongType)
...
.add("downsize", DataTypes.LongType);
Dataset<Row> ds = df.select(functions.from_json(df.col("value").cast(DataTypes.StringType), trafficSchema).as("data")).select("data.*");
return ds.as(ExpressionEncoder.javaBean(Traffic.class));
}