Как создать очень простое правило с использованием Apache Calcite и использовать его на Apache Flink? - PullRequest
0 голосов
/ 04 июля 2019

У меня есть это приложение во Flink, которое использует Table API для печати данных из источника. Официальная документация Flink гласит, что Table API использует Calcite по своей сути для перевода и оптимизации планов запросов. Они не очень подробно описывают это, поэтому я пошел к исходному коду и попытался скопировать некоторые коды оттуда. Но, насколько я видел, они также используют правила кальцита.

Что если я хочу реализовать свое собственное правило? Является ли это возможным? Как реализовать простое правило в Calcite для изменения параметра фильтра, например?

Вот мой код

public class HelloWorldCalcitePlanTableAPI {
    private static final Logger logger = LoggerFactory.getLogger(HelloWorldCalcitePlanTableAPI.class);
    private static final String TICKETS_STATION_01_PLATFORM_01 = "TicketsStation01Plat01";

    public static void main(String[] args) throws Exception {
        new HelloWorldCalcitePlanTableAPI("127.0.0.1", "127.0.0.1");
    }

    public HelloWorldCalcitePlanTableAPI(String ipAddressSource01, String ipAddressSink) throws Exception {
        // Start streaming from fake data source sensors
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableConfig);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // Calcite configuration file to change the query execution plan
        // CalciteConfig cc = tableEnv.getConfig().getCalciteConfig();
        CalciteConfig cc = new CalciteConfigBuilder()
                .addNormRuleSet(RuleSets.ofList(MyFilterReduceExpressionRule.FILTER_INSTANCE))
                .replaceDecoRuleSet(RuleSets.ofList(MyDataStreamRule.INSTANCE))
                .build();
        tableEnv.getConfig().setCalciteConfig(cc);

        // obtain query configuration from TableEnvironment
        StreamQueryConfig qConfig = tableEnv.queryConfig();
        qConfig.withIdleStateRetentionTime(Time.minutes(30), Time.hours(2));

        // Register Data Source Stream tables in the table environment
        tableEnv.registerTableSource(TICKETS_STATION_01_PLATFORM_01,
                new MqttSensorTableSource(ipAddressSource01, TOPIC_STATION_01_PLAT_01_TICKETS));
        Table result = tableEnv.scan(TICKETS_STATION_01_PLATFORM_01)
                .filter(VALUE + " >= 50 && " + VALUE + " <= 100 && " + VALUE + " >= 50")
                ;
        tableEnv.toAppendStream(result, Row.class).print();

        System.out.println("Execution plan ........................ ");
        System.out.println(env.getExecutionPlan());
        System.out.println("Plan explaination ........................ ");
        System.out.println(tableEnv.explain(result));
        System.out.println("........................ ");
        System.out.println("NormRuleSet: " + cc.getNormRuleSet().isDefined());
        System.out.println("LogicalOptRuleSet: " + cc.getLogicalOptRuleSet().isDefined());
        System.out.println("PhysicalOptRuleSet: " + cc.getPhysicalOptRuleSet().isDefined());
        System.out.println("DecoRuleSet: " + cc.getDecoRuleSet().isDefined());
        // @formatter:on

        env.execute("HelloWorldCalcitePlanTableAPI");
    }
}

public class MyDataStreamRule extends RelOptRule {
    public static final MyDataStreamRule INSTANCE = new MyDataStreamRule(operand(DataStreamRel.class, none()), "MyDataStreamRule");

    public MyDataStreamRule(RelOptRuleOperand operand, String description) {
        super(operand, "MyDataStreamRule:" + description);
    }

    public MyDataStreamRule(RelBuilderFactory relBuilderFactory) {
        super(operand(DataStreamRel.class, any()), relBuilderFactory, null);
    }

    public void onMatch(RelOptRuleCall call) {
        DataStreamRel dataStreamRel = (DataStreamRel) call.rel(0);
        System.out.println("======================= MyDataStreamRule.onMatch ====================");
    }
}
public class MyFilterReduceExpressionRule extends RelOptRule {

    public static final MyFilterReduceExpressionRule FILTER_INSTANCE = new MyFilterReduceExpressionRule(
            operand(LogicalFilter.class, none()), "MyFilterReduceExpressionRule");

    public MyFilterReduceExpressionRule(RelOptRuleOperand operand, String description) {
        super(operand, "MyFilterReduceExpressionRule:" + description);
    }

    public MyFilterReduceExpressionRule(RelBuilderFactory relBuilderFactory) {
        super(operand(LogicalFilter.class, any()), relBuilderFactory, null);
    }

    public MyFilterReduceExpressionRule(RelOptRuleOperand operand) {
        super(operand);
    }

    @Override
    public void onMatch(RelOptRuleCall arg0) {
        System.out.println("======================= MyFilterReduceExpressionRule.onMatch ====================");
    }
}
...