Как использовать таблицу hbase в спарк - PullRequest
0 голосов
/ 07 января 2020

Имейте emp, таблицу отделов в HBASE. Как объединить эти таблицы, и результат снова будет сохранен в базе данных. Как я могу сделать это используя spark scala? есть ссылки.

1 Ответ

1 голос
/ 07 января 2020

Hbase - это хранилище ключей и значений. Обычно они не поддерживают join операции.

Единственный вариант - прочитать обе таблицы hbase (используя scan), а затем выполнить spark join.

. посмотрите java пример здесь .

Ниже приведен тот же код, что и в ссылке, на всякий случай.

try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)) {

  // The admin API lets us create, manage and delete tables
  Admin admin = connection.getAdmin();
  // [END bigtable_hw_connect]

  // [START bigtable_hw_create_table]
  // Create a table with a single column family
  HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
  descriptor.addFamily(new HColumnDescriptor(COLUMN_FAMILY_NAME));

  print("Create table " + descriptor.getNameAsString());
  admin.createTable(descriptor);
  // [END bigtable_hw_create_table]

  // [START bigtable_hw_write_rows]
  // Retrieve the table we just created so we can do some reads and writes
  Table table = connection.getTable(TableName.valueOf(TABLE_NAME));

  // Write some rows to the table
  print("Write some greetings to the table");
  for (int i = 0; i < GREETINGS.length; i++) {
    // Each row has a unique row key.
    //
    // Note: This example uses sequential numeric IDs for simplicity, but
    // this can result in poor performance in a production application.
    // Since rows are stored in sorted order by key, sequential keys can
    // result in poor distribution of operations across nodes.
    //
    // For more information about how to design a Bigtable schema for the
    // best performance, see the documentation:
    //
    //     https://cloud.google.com/bigtable/docs/schema-design
    String rowKey = "greeting" + i;

    // Put a single row into the table. We could also pass a list of Puts to write a batch.
    Put put = new Put(Bytes.toBytes(rowKey));
    put.addColumn(COLUMN_FAMILY_NAME, COLUMN_NAME, Bytes.toBytes(GREETINGS[i]));
    table.put(put);
  }
  // [END bigtable_hw_write_rows]

  // [START bigtable_hw_get_by_key]
  // Get the first greeting by row key
  String rowKey = "greeting0";
  Result getResult = table.get(new Get(Bytes.toBytes(rowKey)));
  String greeting = Bytes.toString(getResult.getValue(COLUMN_FAMILY_NAME, COLUMN_NAME));
  System.out.println("Get a single greeting by row key");
  System.out.printf("\t%s = %s\n", rowKey, greeting);
  // [END bigtable_hw_get_by_key]

  // [START bigtable_hw_scan_all]
  // Now scan across all rows.
  Scan scan = new Scan();

  print("Scan for all greetings:");
  ResultScanner scanner = table.getScanner(scan);
  for (Result row : scanner) {
    byte[] valueBytes = row.getValue(COLUMN_FAMILY_NAME, COLUMN_NAME);
    System.out.println('\t' + Bytes.toString(valueBytes));
  }
  // [END bigtable_hw_scan_all]

  // [START bigtable_hw_delete_table]
  // Clean up by disabling and then deleting the table
  print("Delete the table");
  admin.disableTable(table.getName());
  admin.deleteTable(table.getName());
  // [END bigtable_hw_delete_table]

} catch (IOException e) {
  System.err.println("Exception while running HelloWorld: " + e.getMessage());
  e.printStackTrace();
  System.exit(1);
}

System.exit(0);


 }
...