JavaStreamingContext.union не имеет желаемого эффекта - PullRequest
0 голосов
/ 15 марта 2019

Я пытаюсь объединить два JavaPairDStream в JavaPairDStream с помощью метода JavaStreamingContext.union, но это не имеет никакого эффекта.

Ниже приведен пример кода:

List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(topicArrLen);

for (int i = 0; i < topicArrLen; i++)
{
    String num = topicArr[i];
    String zk = pubTool.get("kafka.zk.address" + num);
    String topic = pubTool.get("kafka.original.topic" + num);
    String groupId = pubTool.get("kafka.original.groupid" + num);
    Integer partitions = Integer.valueOf(pubTool.get("kafka.original.partitions" + num));
    Preconditions.checkArgument(StringUtils.isNotBlank(zk) 
        && StringUtils.isNotBlank(groupId)
        && StringUtils.isNotBlank(topic) 
        && partitions > 0,
        "kafka params zk|groupid|topic|partitions is illegal");
    Map<String, Integer> topics = Maps.newHashMap();
    topics.put(topic, partitions);
    kafkaStreams.add(KafkaUtils.createStream(jssc, zk, groupId, topics));
}

JavaPairDStream<String, String> kafkaStream = null;
final int topicType;
if(1 == kafkaStreams.size())
{
    kafkaStream = kafkaStreams.get(0);
    topicType = Integer.valueOf(topicArr[0]);
}
else
{
    kafkaStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
    topicType = 0;
}

JavaDStream<DataEntity> entities = 
kafkaStream.filter((Function<Tuple2<String, String>, Boolean>) arg -> {
    return true; }).map((Function<Tuple2<String, String>, DataEntity>) arg -> {
        DataEntity dataEntity = new DataEntity();
        ...
        return dataEntity;
    }
);

CleanLogService.parse(entities);
jssc.start();
jssc.awaitTermination();
...