спарк-кассандра-коннектор saveToCassandra после переразделения ByCassandraReplica странное поведение - PullRequest
0 голосов
/ 23 мая 2019

Чтобы быстрее писать в Cassandra с помощью saveToCassandra (), я делаю repartitionByCassandraReplica () перед этим. Я вижу, что запись выполняется на каждом узле Cassandra отдельно. Я имею в виду, что в течение 25 минут записи выполняются на узле 1. В течение следующих 25 минут записи выполняются на узле 2 и т. Д. Это поведение кажется странным, поскольку я ожидаю, что Spark будет выполнять запись на все узлы одновременно. Я не уверен, что причиной является код или конфигурация кластера.

Я использую:

  • 28 узлов Кассандры
  • 1 аппарат Spark Driver
  • 1 Вспомогательный аппарат Spark, который запускает 10 исполнителей по 3 ядра в каждом
  • spark-2.4.0-bin-hadoop2.7 (автономно)
  • Кассандра 2.1.18.1463 (я знаю, немного староват)
  • spark-cassandra-connector_2.11 2.4.0

Код задания Spark (извините, это Java API):

CassandraTableScanJavaRDD<CassandraRow> rdd1 = 
    javaFunctions(context).cassandraTable("keyspace", "profile_attribute");

JavaRDD<ProfileAttribute> rdd2 = rdd1.mapPartitions(
        new FlatMapFunction<Iterator<CassandraRow>, ProfileAttribute>() {
    @Override
    public Iterator<ProfileAttribute> call(Iterator<CassandraRow> iter) throws Exception {
        List<ProfileAttribute> l = new LinkedList<>();
        while (iter.hasNext()) {
            CassandraRow row = iter.next();
            ProfileAttribute att = rowToProfileAttribute(row);
            l.add(att);
        }
        return l.iterator();
    }
}, true);


JavaRDD<EntityAlias> rdd3 = rdd2.mapPartitions(
        new FlatMapFunction<Iterator<ProfileAttribute>, EntityAlias>() {
    @Override
    public Iterator<EntityAlias> call(Iterator<ProfileAttribute> iter) throws Exception {
        List<EntityAlias> l = new LinkedList<>();
        while (iter.hasNext()) {
            ProfileAttribute attr = iter.next();
            EntityAlias alias = new EntityAlias(attr.getTerm(), attr.getPid(), 
                attr.getAssociation_type_id(), attr.getCreate_date(),
                attr.getPartner_id(), attr.getNumeric_value(), attr.getAttribute_type());
            l.add(alias);
        }
        return l.iterator();
    }
}, true);


JavaRDD<EntityAlias> rdd4 = 
    CassandraJavaUtil.javaFunctions(rdd3).repartitionByCassandraReplica(
        "keyspace", "entity_alias", 2000,
            CassandraJavaUtil.someColumns(alias_key),
            CassandraJavaUtil.mapToRow(EntityAlias.class));


JavaPairRDD<EntityAliasPK, EntityAlias> rdd5 = rdd4.mapPartitionsToPair(
        new PairFlatMapFunction<Iterator<EntityAlias>, EntityAliasPK, EntityAlias>() {
    @Override
    public Iterator<Tuple2<EntityAliasPK, EntityAlias>> call(Iterator<EntityAlias> iter) throws Exception {
        List<Tuple2<EntityAliasPK, EntityAlias>> l = new LinkedList<>();
        while (iter.hasNext()) {
            EntityAlias alias = iter.next();
            EntityAliasPK pk = new EntityAliasPK(
                alias.getAlias_key(), alias.getEntity_id(), alias.getComposition_key());
            Tuple2<EntityAliasPK, EntityAlias> t = new Tuple2<EntityAliasPK, EntityAlias>(pk, alias);
            l.add(t);
        }
        return l.iterator();
    }
}, true);


JavaPairRDD<EntityAliasPK, EntityAlias> rdd6 = rdd5.reduceByKey(
        new Function2<EntityAlias, EntityAlias, EntityAlias>() {
    @Override
    public EntityAlias call(EntityAlias e1, EntityAlias e2) throws Exception {
        _duplicate.add(1l);
        return e1.getDate() >= e2.getDate() ? e1 : e2;
    }
});


JavaRDD<EntityAlias> rdd7 = rdd6.values();


RDDAndDStreamCommonJavaFunctions<T>.WriterBuilder wb = 
    javaFunctions(rdd7).writerBuilder("keyspace", "entity_alias", mapToRow(EntityAlias.class));
WriteConf wc = new WriteConf(
        wb.writeConf.batchSize(),
        wb.writeConf.batchGroupingBufferSize(),
        wb.writeConf.batchGroupingKey(),
        wb.writeConf.consistencyLevel(),
        wb.writeConf.ifNotExists(),
        wb.writeConf.ignoreNulls(),
        wb.writeConf.parallelismLevel(), 
        wb.writeConf.throughputMiBPS(),
        TTLOption.constant(5000),
        TimestampOption.defaultValue(),
        false);
wb.withWriteConf(wc).saveToCassandra();

Классы сущностей:

class ProfileAttribute {
    private String  pid;              // partition key
    private Integer partner_id;       // clustering column
    private Integer attribute_key;    // clustering column
    private Integer attribute_type;
    private Integer numeric_value;
    private Long    create_date;
    private String  term;
    private Integer site_id;
    private Integer ttl_seconds;
    private Integer uahash;
    private Integer association_type_id;
    private Long    ualong_hash;
    private String  xpartner_advertiser_id;
    private Integer ttl;
    private Long    writetime;
}

class EntityAlias {
    private String  alias_key;        // partition key
    private String  entity_id;        // clustering column 
    private long    composition_key;  // clustering column
    private int     association_type;
    private Long    date;
    private int     partner_id;
    private int     numeric;
    private int     attribute_type;
}

class EntityAliasPK {
    private String  alias_key;
    private String  entity_id; 
    private long    composition_key;
}
...