Чтобы быстрее писать в Cassandra с помощью saveToCassandra (), я делаю repartitionByCassandraReplica () перед этим. Я вижу, что запись выполняется на каждом узле Cassandra отдельно. Я имею в виду, что в течение 25 минут записи выполняются на узле 1. В течение следующих 25 минут записи выполняются на узле 2 и т. Д. Это поведение кажется странным, поскольку я ожидаю, что Spark будет выполнять запись на все узлы одновременно.
Я не уверен, что причиной является код или конфигурация кластера.
Я использую:
- 28 узлов Кассандры
- 1 аппарат Spark Driver
- 1 Вспомогательный аппарат Spark, который запускает 10 исполнителей по 3 ядра в каждом
- spark-2.4.0-bin-hadoop2.7 (автономно)
- Кассандра 2.1.18.1463 (я знаю, немного староват)
- spark-cassandra-connector_2.11 2.4.0
Код задания Spark (извините, это Java API):
CassandraTableScanJavaRDD<CassandraRow> rdd1 =
javaFunctions(context).cassandraTable("keyspace", "profile_attribute");
JavaRDD<ProfileAttribute> rdd2 = rdd1.mapPartitions(
new FlatMapFunction<Iterator<CassandraRow>, ProfileAttribute>() {
@Override
public Iterator<ProfileAttribute> call(Iterator<CassandraRow> iter) throws Exception {
List<ProfileAttribute> l = new LinkedList<>();
while (iter.hasNext()) {
CassandraRow row = iter.next();
ProfileAttribute att = rowToProfileAttribute(row);
l.add(att);
}
return l.iterator();
}
}, true);
JavaRDD<EntityAlias> rdd3 = rdd2.mapPartitions(
new FlatMapFunction<Iterator<ProfileAttribute>, EntityAlias>() {
@Override
public Iterator<EntityAlias> call(Iterator<ProfileAttribute> iter) throws Exception {
List<EntityAlias> l = new LinkedList<>();
while (iter.hasNext()) {
ProfileAttribute attr = iter.next();
EntityAlias alias = new EntityAlias(attr.getTerm(), attr.getPid(),
attr.getAssociation_type_id(), attr.getCreate_date(),
attr.getPartner_id(), attr.getNumeric_value(), attr.getAttribute_type());
l.add(alias);
}
return l.iterator();
}
}, true);
JavaRDD<EntityAlias> rdd4 =
CassandraJavaUtil.javaFunctions(rdd3).repartitionByCassandraReplica(
"keyspace", "entity_alias", 2000,
CassandraJavaUtil.someColumns(alias_key),
CassandraJavaUtil.mapToRow(EntityAlias.class));
JavaPairRDD<EntityAliasPK, EntityAlias> rdd5 = rdd4.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<EntityAlias>, EntityAliasPK, EntityAlias>() {
@Override
public Iterator<Tuple2<EntityAliasPK, EntityAlias>> call(Iterator<EntityAlias> iter) throws Exception {
List<Tuple2<EntityAliasPK, EntityAlias>> l = new LinkedList<>();
while (iter.hasNext()) {
EntityAlias alias = iter.next();
EntityAliasPK pk = new EntityAliasPK(
alias.getAlias_key(), alias.getEntity_id(), alias.getComposition_key());
Tuple2<EntityAliasPK, EntityAlias> t = new Tuple2<EntityAliasPK, EntityAlias>(pk, alias);
l.add(t);
}
return l.iterator();
}
}, true);
JavaPairRDD<EntityAliasPK, EntityAlias> rdd6 = rdd5.reduceByKey(
new Function2<EntityAlias, EntityAlias, EntityAlias>() {
@Override
public EntityAlias call(EntityAlias e1, EntityAlias e2) throws Exception {
_duplicate.add(1l);
return e1.getDate() >= e2.getDate() ? e1 : e2;
}
});
JavaRDD<EntityAlias> rdd7 = rdd6.values();
RDDAndDStreamCommonJavaFunctions<T>.WriterBuilder wb =
javaFunctions(rdd7).writerBuilder("keyspace", "entity_alias", mapToRow(EntityAlias.class));
WriteConf wc = new WriteConf(
wb.writeConf.batchSize(),
wb.writeConf.batchGroupingBufferSize(),
wb.writeConf.batchGroupingKey(),
wb.writeConf.consistencyLevel(),
wb.writeConf.ifNotExists(),
wb.writeConf.ignoreNulls(),
wb.writeConf.parallelismLevel(),
wb.writeConf.throughputMiBPS(),
TTLOption.constant(5000),
TimestampOption.defaultValue(),
false);
wb.withWriteConf(wc).saveToCassandra();
Классы сущностей:
class ProfileAttribute {
private String pid; // partition key
private Integer partner_id; // clustering column
private Integer attribute_key; // clustering column
private Integer attribute_type;
private Integer numeric_value;
private Long create_date;
private String term;
private Integer site_id;
private Integer ttl_seconds;
private Integer uahash;
private Integer association_type_id;
private Long ualong_hash;
private String xpartner_advertiser_id;
private Integer ttl;
private Long writetime;
}
class EntityAlias {
private String alias_key; // partition key
private String entity_id; // clustering column
private long composition_key; // clustering column
private int association_type;
private Long date;
private int partner_id;
private int numeric;
private int attribute_type;
}
class EntityAliasPK {
private String alias_key;
private String entity_id;
private long composition_key;
}