Преобразование таблицы MySQL в семейство ColumnFamily в Кассандре: медленные пакетные мутации с помощью Гектора - PullRequest
1 голос
/ 15 декабря 2011

У меня очень большая таблица MySQL (миллиарды строк с десятками столбцов), которую я хотел бы преобразовать в семейство ColumnFamily в Кассандре. Я пользуюсь Гектором.

Сначала я создаю свою схему как таковую:

    String clusterName = "Test Cluster";
    String host = "cassandra.lanhost.com:9160";
    String newKeyspaceName = "KeyspaceName";
    String newColumnFamilyName = "CFName";

    ThriftCluster cassandraCluster;
    CassandraHostConfigurator cassandraHostConfigurator;

    cassandraHostConfigurator = new CassandraHostConfigurator(host);
    cassandraCluster = new ThriftCluster(clusterName, cassandraHostConfigurator);

    BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
    columnFamilyDefinition.setKeyspaceName(newKeyspaceName);
    columnFamilyDefinition.setName(newColumnFamilyName);    
    columnFamilyDefinition.setDefaultValidationClass("UTF8Type");
    columnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
    columnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);

    BasicColumnDefinition columnDefinition = new BasicColumnDefinition();
    columnDefinition.setName(StringSerializer.get().toByteBuffer("id"));
    columnDefinition.setIndexType(ColumnIndexType.KEYS);
    columnDefinition.setValidationClass(ComparatorType.INTEGERTYPE.getClassName());
    columnDefinition.setIndexName("id_index");
    columnFamilyDefinition.addColumnDefinition(columnDefinition);

    columnDefinition = new BasicColumnDefinition();
    columnDefinition.setName(StringSerializer.get().toByteBuffer("status"));
    columnDefinition.setIndexType(ColumnIndexType.KEYS);
    columnDefinition.setValidationClass(ComparatorType.ASCIITYPE.getClassName());
    columnDefinition.setIndexName("status_index");
    columnFamilyDefinition.addColumnDefinition(columnDefinition);

        .......

    ColumnFamilyDefinition cfDef = new ThriftCfDef(columnFamilyDefinition);

    KeyspaceDefinition keyspaceDefinition = 
        HFactory.createKeyspaceDefinition(newKeyspaceName, "org.apache.cassandra.locator.SimpleStrategy", 1, Arrays.asList(cfDef));

    cassandraCluster.addKeyspace(keyspaceDefinition);

Как только это будет сделано, я загружаю свои данные, хранящиеся в списке, так как я извлекаю данные MySQL с помощью namedParametersJdbcTemplate, как показано:

String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String KeyspaceName = "KeyspaceName";
String ColumnFamilyName = "CFName";
final StringSerializer serializer = StringSerializer.get();

public void insert(List<SqlParameterSource> dataToInsert) throws ExceptionParserInterrupted {

    Keyspace workingKeyspace = null;
    Cluster cassandraCluster = HFactory.getOrCreateCluster(clusterName, host);
    workingKeyspace = HFactory.createKeyspace(KeyspaceName, cassandraCluster);
    Mutator<String> mutator = HFactory.createMutator(workingKeyspace, serializer);

    ColumnFamilyTemplate<String, String> template = new ThriftColumnFamilyTemplate<String, String>(workingKeyspace, ColumnFamilyName, serializer, serializer);

    long t1 = System.currentTimeMillis();

    for (SqlParameterSource data : dataToInsert) {

        String keyId = "id" + (Integer) data.getValue("id");

    mutator.addInsertion(keyId, ColumnFamilyName, HFactory.createColumn("id", (Integer) data.getValue("id"), StringSerializer.get(), IntegerSerializer.get()));
    mutator.addInsertion(keyId,ColumnFamilyName, HFactory.createStringColumn("status", data.getValue("status").toString()));

          ...............

    }

    mutator.execute();

    System.out.println(t1 - System.currentTimeMillis());

Я вставляю 100 000 строк примерно за 1 час, что очень медленно. Я слышал о многопоточности своих вставок, но в данном конкретном случае я не знаю, что делать. Должен ли я использовать BatchMutate?

Ответы [ 2 ]

1 голос
/ 16 декабря 2011

Да, вы должны запустить код вставки из нескольких потоков.Взгляните на следующий код стресс-тестирования для примера того, как эффективно сделать это с помощью hector: https://github.com/zznate/cassandra-stress

Дополнительным источником проблемы производительности вставки может быть число вторичных индексов, которые вы применяете ксемейство столбцов (каждый вторичный индекс создает дополнительное семейство столбцов «под капотом»).

Правильно спроектированные модели данных на самом деле не должны нуждаться в большом количестве вторичных индексов.Следующая статья дает хороший обзор моделирования данных в Кассандре: http://www.datastax.com/docs/1.0/ddl/index

1 голос
/ 16 декабря 2011

Существует один альтернативный способ достижения этого.Вы можете попробовать исследовать https://github.com/impetus-opensource/Kundera. Вам понравится.

Kundera является JPA 2.0-совместимой библиотекой отображения объектов-хранилищ данных для хранилищ данных NoSQL и в настоящее время поддерживает Cassandra, HBase, MongoDB и все реляционные хранилища данных (Kundera внутренне использует Hibernate для всех реляционных хранилищ данных).

InВ вашем случае вы можете использовать существующие объекты вместе с аннотациями JPA для их хранения в Cassandra.Поскольку Kundera поддерживает постоянство полиглотов, вы также используете комбинацию MySQL + Cassandra, в которой вы можете использовать MySQL для большинства ваших данных и Cassandra для транзакционных данных. И поскольку все, что вам нужно, - это объекты и аннотации JPA, ваша работа будет намного проще.

Для производительности вы можете взглянуть на https://github.com/impetus-opensource/Kundera/wiki/Kundera-Performance

...