Как интегрировать пользовательский источник таблицы и пользовательский приемник таблицы с клиентом SQL? - PullRequest
0 голосов
/ 05 сентября 2018

Предположим, мы определили пользовательский TableSource и TableSink, тогда как интегрировать с SQL Client? Нужно ли вручную регистрировать пользовательское имя TableSource \ Sink, как показано ниже? Если не регистрировать вручную, как тип коннектора custom1 map \ связан с custom1TableSource?

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
TableSource custom1TableSource  = new custom1TableSource ( );
tableEnv.registerTableSource("custom1", custom1TableSource);

Затем настроить файл среды ниже?

   tables:
      - name: custom1TableSource
        type: Source
        update-mode: append
        connector:
          property-version: 1
          type: ***custom1****

Источник и приемник, которые я объявил:

package com.abc;
public static class custom1TableSource implements StreamTableSource<Row>, DefinedRowtimeAttributes, DefinedProctimeAttribute {


package com.abc;
public static class custom1TableSink implements TableSink<Row>, AppendStreamTableSink<Row> {

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sqlClient.html#configuration

Обновление:

После некоторой проверки из исходного кода я обнаружил, что Flink создает экземпляр приемника и источника с помощью реализаций StreamTableSinkFactory и фабрики, созданной ServiceLoader, однако как зарегистрировать имя приемника и источника в классах TableSource и TabSink?

Ответы [ 2 ]

0 голосов
/ 06 сентября 2018

Пожалуйста, ознакомьтесь с документацией для пользовательских источников и приемников .

Как клиент SQL, так и API таблиц и SQL используют так называемые TableFactory s, которые обнаруживаются с помощью интерфейсов поставщика услуг (SPI) Java.

0 голосов
/ 05 сентября 2018

Я получил ответ, он должен переопределить requiredContext () и написать файл connector.type вручную, взяв в качестве примера kafka, ему нужно назначить "kafka" для connector.type:

public abstract class KafkaTableSourceSinkFactoryBase implements
        StreamTableSourceFactory<Row>,
        StreamTableSinkFactory<Row> {

@Override
public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode
    **context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka**
    context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
    context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
    return context;
}
...