Что я понимаю, так это медленная скорость записи в hbase.У меня есть несколько предложений для вас.
1) hbase.client.write.buffe r .
следующее свойство может помочь вам.
hbase.client.write.buffer
Описание Размер по умолчанию буфера записи BufferedMutator в байтах.Больший буфер требует больше памяти - как на стороне клиента, так и на стороне сервера, так как сервер создает экземпляр переданного буфера записи для его обработки - но больший размер буфера уменьшает количество созданных RPC.Для оценки использования памяти на стороне сервера оцените hbase.client.write.buffer * hbase.regionserver.handler.count
По умолчанию 2097152 (около 2 МБ)
Iпредпочитайте foreachBatch
см. документацию по искрам (своего рода foreachPartition в искровом ядре), а не foreach
Также в вашем hbase Writer расширяется ForeachWriter
open
метод intialize список массивов put в process
, добавьте put в массив списков put в close
table.put(listofputs);
и затем сбросьте массив, когда вы обновите таблицу ...
что он делает в основном для вашего буфераРазмер, упомянутый выше, заполняется 2 МБ, после чего он помещается в таблицу hbase.до этого записи не будут попадать в таблицу hbase.
вы можете увеличить это до 10 МБ и т. д. Таким образом, количество RPC будет уменьшено.и огромный кусок данных будет сброшен и будет находиться в таблице hbase.
, когда буфер записи заполнен и flushCommits
в таблицу hbase запущена.
Пример кода: в моем ответ
2) отключить WAL вы можете отключить WAL (запись в журнал регистрации - Опасность не восстанавливается), но она ускоритсяпишет ... если не хочешь восстановить данные.
Примечание : если вы используете поиск по solr или cloudera в таблицах hbase, вам не следует его отключать, так как Solr будет работать в WAL.если вы отключите его, индексирование Solr не сработает .. это одна из распространенных ошибок, которые делают многие из нас.
Как отключить: https://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean)
, как я уже упоминал, список путов - это хороший способ ... это старый способ (foreachPartition со списком путов) делать до структурированной потоковой передачи, как показано ниже ... где foreachPartition
действует для каждого раздела, а не для каждого ряда.
def writeHbase(mydataframe: DataFrame) = {
val columnFamilyName: String = "c"
mydataframe.foreachPartition(rows => {
val puts = new util.ArrayList[ Put ]
rows.foreach(row => {
val key = row.getAs[ String ]("rowKey")
val p = new Put(Bytes.toBytes(key))
val columnV = row.getAs[ Double ]("x")
val columnT = row.getAs[ Long ]("y")
p.addColumn(
Bytes.toBytes(columnFamilyName),
Bytes.toBytes("x"),
Bytes.toBytes(columnX)
)
p.addColumn(
Bytes.toBytes(columnFamilyName),
Bytes.toBytes("y"),
Bytes.toBytes(columnY)
)
puts.add(p)
})
HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
})
}
Подводя итог:
Я считаю, что нам необходимо понять психологию искры и основы, чтобы создать эффективную пару.