У меня есть это приложение во Flink, которое использует Table API для печати данных из источника. Официальная документация Flink гласит, что Table API использует Calcite по своей сути для перевода и оптимизации планов запросов. Они не очень подробно описывают это, поэтому я пошел к исходному коду и попытался скопировать некоторые коды оттуда. Но, насколько я видел, они также используют правила кальцита.
Что если я хочу реализовать свое собственное правило? Является ли это возможным? Как реализовать простое правило в Calcite для изменения параметра фильтра, например?
Вот мой код
public class HelloWorldCalcitePlanTableAPI {
private static final Logger logger = LoggerFactory.getLogger(HelloWorldCalcitePlanTableAPI.class);
private static final String TICKETS_STATION_01_PLATFORM_01 = "TicketsStation01Plat01";
public static void main(String[] args) throws Exception {
new HelloWorldCalcitePlanTableAPI("127.0.0.1", "127.0.0.1");
}
public HelloWorldCalcitePlanTableAPI(String ipAddressSource01, String ipAddressSink) throws Exception {
// Start streaming from fake data source sensors
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableConfig);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Calcite configuration file to change the query execution plan
// CalciteConfig cc = tableEnv.getConfig().getCalciteConfig();
CalciteConfig cc = new CalciteConfigBuilder()
.addNormRuleSet(RuleSets.ofList(MyFilterReduceExpressionRule.FILTER_INSTANCE))
.replaceDecoRuleSet(RuleSets.ofList(MyDataStreamRule.INSTANCE))
.build();
tableEnv.getConfig().setCalciteConfig(cc);
// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
qConfig.withIdleStateRetentionTime(Time.minutes(30), Time.hours(2));
// Register Data Source Stream tables in the table environment
tableEnv.registerTableSource(TICKETS_STATION_01_PLATFORM_01,
new MqttSensorTableSource(ipAddressSource01, TOPIC_STATION_01_PLAT_01_TICKETS));
Table result = tableEnv.scan(TICKETS_STATION_01_PLATFORM_01)
.filter(VALUE + " >= 50 && " + VALUE + " <= 100 && " + VALUE + " >= 50")
;
tableEnv.toAppendStream(result, Row.class).print();
System.out.println("Execution plan ........................ ");
System.out.println(env.getExecutionPlan());
System.out.println("Plan explaination ........................ ");
System.out.println(tableEnv.explain(result));
System.out.println("........................ ");
System.out.println("NormRuleSet: " + cc.getNormRuleSet().isDefined());
System.out.println("LogicalOptRuleSet: " + cc.getLogicalOptRuleSet().isDefined());
System.out.println("PhysicalOptRuleSet: " + cc.getPhysicalOptRuleSet().isDefined());
System.out.println("DecoRuleSet: " + cc.getDecoRuleSet().isDefined());
// @formatter:on
env.execute("HelloWorldCalcitePlanTableAPI");
}
}
public class MyDataStreamRule extends RelOptRule {
public static final MyDataStreamRule INSTANCE = new MyDataStreamRule(operand(DataStreamRel.class, none()), "MyDataStreamRule");
public MyDataStreamRule(RelOptRuleOperand operand, String description) {
super(operand, "MyDataStreamRule:" + description);
}
public MyDataStreamRule(RelBuilderFactory relBuilderFactory) {
super(operand(DataStreamRel.class, any()), relBuilderFactory, null);
}
public void onMatch(RelOptRuleCall call) {
DataStreamRel dataStreamRel = (DataStreamRel) call.rel(0);
System.out.println("======================= MyDataStreamRule.onMatch ====================");
}
}
public class MyFilterReduceExpressionRule extends RelOptRule {
public static final MyFilterReduceExpressionRule FILTER_INSTANCE = new MyFilterReduceExpressionRule(
operand(LogicalFilter.class, none()), "MyFilterReduceExpressionRule");
public MyFilterReduceExpressionRule(RelOptRuleOperand operand, String description) {
super(operand, "MyFilterReduceExpressionRule:" + description);
}
public MyFilterReduceExpressionRule(RelBuilderFactory relBuilderFactory) {
super(operand(LogicalFilter.class, any()), relBuilderFactory, null);
}
public MyFilterReduceExpressionRule(RelOptRuleOperand operand) {
super(operand);
}
@Override
public void onMatch(RelOptRuleCall arg0) {
System.out.println("======================= MyFilterReduceExpressionRule.onMatch ====================");
}
}