Кафка моргать в улей - пишет не удается - PullRequest
0 голосов
/ 11 июня 2018

Я пытаюсь отправить данные в Hive через Kafka -> Flink -> Hive, используя следующий фрагмент кода:

Но я получаю следующую ошибку:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<GenericRecord> stream = readFromKafka(env);


private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
        BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
};

 JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.hive.jdbc.HiveDriver")
            .setDBUrl("jdbc:hive2://hiveconnstring")
            .setUsername("myuser")
            .setPassword("mypass")
            .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES (?,?)")
            .setBatchSize(1000)
            .setParameterTypes(FIELD_TYPES)
            .build();

    DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 -> {
                Row row = new Row(2); // 
                row.setField(0, st1.get("SOME_ID")); 
                row.setField(1, st1.get("SOME_ADDRESS"));
                return row;
            });

    sink.emitDataStream(rows);
    env.execute("Flink101");


Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
... 12 more

Caused by: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
... 17 more

Я проверил Hive-Драйвер jdbc, и кажется, что этот метод не поддерживается драйвером hive-jdbc.

public class HiveStatement implements java.sql.Statement {
...

  @Override  
  public int[] executeBatch() throws SQLException {
        throw new SQLFeatureNotSupportedException("Method not supported");
  }

..
}

Есть ли какой-нибудь способ, которым мы можем добиться этого с помощью драйвера JDBC?

Дайте мне знать,

Заранее спасибо.

1 Ответ

0 голосов
/ 11 июня 2018

Реализация JDBC Hive еще не завершена.Ваша проблема отслеживается этой проблемой .

Вы можете попытаться пропатчить Флинка JDBCOutputFormat, чтобы не использовать пакетирование, заменив upload.addBatch на upload.execute в JDBCOutputFormat.java:202 и удалите вызов.до upload.executeBatch в JDBCOutputFormat.java:216.Недостатком будет то, что вы будете выдавать для каждой записи отдельный SQL-запрос, который может замедлить работу.

...