Storm simple jdbc mapper записывать массив в Phoenix db не поддерживается? - PullRequest
0 голосов
/ 10 декабря 2018

У меня есть топология Storm (дистрибутив hortonworks, версия 1.1.0.2.6.2), которая записывает в базу данных Phoenix (версия 4.7.0.2.6.2), используя класс JdbcInsertBolt.Я успешно записал в стандартные столбцы, например: varchar и int, но мне нужно записать в столбец массива и наткнулся на это:

java.lang.RuntimeException: We do not support tables with SqlType: ARRAY
    at org.apache.storm.jdbc.common.Util.getJavaType(Util.java:72) ~[stormjar.jar:?]
    at org.apache.storm.jdbc.mapper.SimpleJdbcMapper.getColumns(SimpleJdbcMapper.java:60) ~[stormjar.jar:?]
    at org.apache.storm.jdbc.bolt.JdbcInsertBolt.process(JdbcInsertBolt.java:87) [stormjar.jar:?]
    at org.apache.storm.topology.base.BaseTickTupleAwareRichBolt.execute(BaseTickTupleAwareRichBolt.java:38) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.daemon.executor$fn__10454$tuple_action_fn__10456.invoke(executor.clj:730) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10375.invoke(executor.clj:462) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.disruptor$clojure_handler$reify__9889.onEvent(disruptor.clj:40) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]

Массив, который я пытаюсь записать, является массивом строк,обойти эту проблему?

1 Ответ

0 голосов
/ 19 декабря 2018

Написал мою собственную реализацию JdbcInsertBolt, JdbcClient и JdbcMapper

Код, соответствующий Mapper:

 else if(getJavaType(columnSqlType).equals(Array.class)) {
            String[] value = (String[]) tuple.getValueByField(columnName);
            columns.add(new Column(columnName, PDataType.instantiatePhoenixArray(PDataType.arrayBaseType(PVarcharArray.INSTANCE),value) , columnSqlType));

Код, соответствующий клиенту:

 else if (columnJavaType.equals(Array.class)) {
            preparedStatement.setArray(index, (PhoenixArray) column.getVal());
...