Невозможно получить результаты из запроса flink SQL - PullRequest
0 голосов
/ 28 августа 2018

Я столкнулся с проблемой, из-за которой я не получаю результаты из своего запроса в Flink-SQL.

У меня есть некоторая информация, хранящаяся в двух темах Kafka, я хочу сохранить их в двух таблицах и выполнить потоковое соединение между ними.

Это мои инструкции по флинку:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        // configure Kafka consumer
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
        props.setProperty("group.id", "flink-consumer"); // Consumer group ID

        FlinkKafkaConsumer011<Blocks> flinkBlocksConsumer = new FlinkKafkaConsumer011<>(args[0], new BlocksSchema(), props);
        flinkBlocksConsumer.setStartFromEarliest();

        FlinkKafkaConsumer011<Transactions> flinkTransactionsConsumer = new FlinkKafkaConsumer011<>(args[1], new TransactionsSchema(), props);
        flinkTransactionsConsumer.setStartFromEarliest();

        DataStream<Blocks> blocks = env.addSource(flinkBlocksConsumer);

        DataStream<Transactions> transactions = env.addSource(flinkTransactionsConsumer);

        tableEnv.registerDataStream("blocksTable", blocks);
        tableEnv.registerDataStream("transactionsTable", transactions);

Вот мой SQL-запрос:

Table sqlResult
   = tableEnv.sqlQuery(
       "SELECT block_timestamp,count(tx_hash) " +
       "FROM blocksTable " +
       "JOIN transactionsTable " +
       "ON blocksTable.block_hash=transactionsTable.tx_hash " +
       "GROUP BY blocksTable.block_timestamp");
DataStream<Test> resultStream = tableEnv
        .toRetractStream(sqlResult,Row.class)
        .map(t -> {
             Row r = t.f1;
             String field2 = r.getField(0).toString();
             long count = Long.valueOf(r.getField(1).toString());
             return new Test(field2,count);
             })
        .returns(Test.class);

Затем я печатаю результаты:

resultStream.print();

Но я не получаю никаких ответов, моя программа застряла ...

Для схемы, используемой для сериализации и десериализации, вот мой тестовый класс, в котором хранится результат моего запроса (два поля - строка и long для соответственно block_timestamp и count):

public class TestSchema implements DeserializationSchema<Test>, SerializationSchema<Test> {


    @Override
    public Test deserialize(byte[] message) throws IOException {
        return Test.fromString(new String(message));
    }

    @Override
    public boolean isEndOfStream(Test nextElement) {
        return false;
    }

    @Override
    public byte[] serialize(Test element) {
        return element.toString().getBytes();
    }

    @Override
    public TypeInformation<Test> getProducedType() {
        return TypeInformation.of(Test.class);
    }
}

Это тот же принцип для BlockSchema и TransactionsSchema классов.

Знаете ли вы, почему я не могу получить результат моего запроса? Должен ли я проверить с BatchExecutionEnvironment?

...