Я столкнулся с проблемой, из-за которой я не получаю результаты из своего запроса в Flink-SQL.
У меня есть некоторая информация, хранящаяся в двух темах Kafka, я хочу сохранить их в двух таблицах и выполнить потоковое соединение между ними.
Это мои инструкции по флинку:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// configure Kafka consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
props.setProperty("group.id", "flink-consumer"); // Consumer group ID
FlinkKafkaConsumer011<Blocks> flinkBlocksConsumer = new FlinkKafkaConsumer011<>(args[0], new BlocksSchema(), props);
flinkBlocksConsumer.setStartFromEarliest();
FlinkKafkaConsumer011<Transactions> flinkTransactionsConsumer = new FlinkKafkaConsumer011<>(args[1], new TransactionsSchema(), props);
flinkTransactionsConsumer.setStartFromEarliest();
DataStream<Blocks> blocks = env.addSource(flinkBlocksConsumer);
DataStream<Transactions> transactions = env.addSource(flinkTransactionsConsumer);
tableEnv.registerDataStream("blocksTable", blocks);
tableEnv.registerDataStream("transactionsTable", transactions);
Вот мой SQL-запрос:
Table sqlResult
= tableEnv.sqlQuery(
"SELECT block_timestamp,count(tx_hash) " +
"FROM blocksTable " +
"JOIN transactionsTable " +
"ON blocksTable.block_hash=transactionsTable.tx_hash " +
"GROUP BY blocksTable.block_timestamp");
DataStream<Test> resultStream = tableEnv
.toRetractStream(sqlResult,Row.class)
.map(t -> {
Row r = t.f1;
String field2 = r.getField(0).toString();
long count = Long.valueOf(r.getField(1).toString());
return new Test(field2,count);
})
.returns(Test.class);
Затем я печатаю результаты:
resultStream.print();
Но я не получаю никаких ответов, моя программа застряла ...
Для схемы, используемой для сериализации и десериализации, вот мой тестовый класс, в котором хранится результат моего запроса (два поля - строка и long для соответственно block_timestamp и count):
public class TestSchema implements DeserializationSchema<Test>, SerializationSchema<Test> {
@Override
public Test deserialize(byte[] message) throws IOException {
return Test.fromString(new String(message));
}
@Override
public boolean isEndOfStream(Test nextElement) {
return false;
}
@Override
public byte[] serialize(Test element) {
return element.toString().getBytes();
}
@Override
public TypeInformation<Test> getProducedType() {
return TypeInformation.of(Test.class);
}
}
Это тот же принцип для BlockSchema
и TransactionsSchema
классов.
Знаете ли вы, почему я не могу получить результат моего запроса? Должен ли я проверить с BatchExecutionEnvironment
?