Spark Cassandra Connector Java API добавить / удалить данные в сбое коллекции - PullRequest
0 голосов
/ 30 мая 2018

Я пытаюсь добавить значения в столбец набора типов через JAVA API.

Кажется, что соединитель игнорирует тип устанавливаемой мной CollectionBehavior и всегда переопределяет предыдущую коллекцию.

Даже когда я использую CollectionRemove, удаляемое значение добавляется в коллекцию.

Я следую примеру, как показано в:

https://datastax -oss.atlassian.net / browse / SPARKC-340? page = com.atlassian.jira.plugin.system.issuetabpanels% 3Achangehistory-tabpanel

Я использую:

  • spark-core_2.11 2.2.0
  • spark-cassandra-connector_2.11 2.0.5
  • Cassandra 2.1.17

Может быть, эта функцияне поддерживается в этих версиях?

Вот код реализации:

// CASSANDRA TABLE
CREATE TABLE test.profile (
    id text PRIMARY KEY,
    dates set<bigint>,
)

// ENTITY
public class ProfileRow {
    public static final Map<String, String> namesMap;
    static {
        namesMap = new HashMap<>();
        namesMap.put("id", "id");
        namesMap.put("dates", "dates");
    }
    private String id;
    private Set<Long> dates;
    public ProfileRow() {}
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public Set<Long> getDates() {
        return dates;
    }
    public void setDates(Set<Long> dates) {
        this.dates = dates;
    }
}


public void execute(JavaSparkContext context) {
    List<ProfileRow> elements = new LinkedList<>();
    ProfileRow profile = new ProfileRow();
    profile.setId("fGxTObQIXM");
    Set<Long> dates = new HashSet<>();
    dates.add(1l);
    profile.setDates(dates);
    elements.add(profile);
    JavaRDD<ProfileRow> rdd = context.parallelize(elements);

    RDDAndDStreamCommonJavaFunctions<T>.WriterBuilder wb = javaFunctions(rdd)
        .writerBuilder("test", "profile", mapToRow(ProfileRow.class, ProfileRow.namesMap));
    CollectionColumnName appendColumn = new CollectionColumnName("dates", Option.empty(), CollectionAppend$.MODULE$);
    scala.collection.Seq<ColumnRef> columnRefSeq = JavaApiHelper.toScalaSeq(Arrays.asList(appendColumn));
    SomeColumns columnSelector = SomeColumns$.MODULE$.apply(columnRefSeq);

    wb.withColumnSelector(columnSelector);
    wb.saveToCassandra();
}

Спасибо,

Shai

1 Ответ

0 голосов
/ 31 мая 2018

Я нашел ответ.Мне нужно изменить 2 вещи:

  1. Добавить столбец первичного ключа в селектор столбцов.
  2. WriterBuilder.withColumnSelector () создает новый экземпляр WriterBuilder, поэтому мне пришлосьсохранить новый экземпляр.

:

RDDAndDStreamCommonJavaFunctions<T>.WriterBuilder wb = javaFunctions(rdd)
    .writerBuilder("test", "profile", mapToRow(ProfileRow.class, ProfileRow.namesMap));
ColumnName pkColumn = new ColumnName("id", Option.empty())
CollectionColumnName appendColumn = new CollectionColumnName("dates", Option.empty(), CollectionAppend$.MODULE$);
scala.collection.Seq<ColumnRef> columnRefSeq = JavaApiHelper.toScalaSeq(Arrays.asList(pkColumn, appendColumn));
SomeColumns columnSelector = SomeColumns$.MODULE$.apply(columnRefSeq);

wb = wb.withColumnSelector(columnSelector);
wb.saveToCassandra();
...