Flink: осталось присоединение к потоку со списком stati c - PullRequest
1 голос
/ 09 апреля 2020

Я хочу присоединить поток попыток к списку заблокированных писем c и сгруппировать результаты по IP, чтобы впоследствии я мог посчитать пачку соответствующих статистических данных. Результат должен быть представлен в виде скользящего окна через 30 минут после каждых 10 секунд. Ниже приведен один из нескольких способов, с помощью которых я пытался добиться этого:

override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
        "WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

При этом используется описанная ниже пользовательская табличная функция, которая уже зарегистрирована в моем tableEnv как blockedEmailsList:

public class BlockedEmailsList extends TableFunction<Row> {
    private Collection<String> emails;

    public BlockedEmailsList(Collection<String> emails) {
        this.emails = emails;
    }

    public Row read(String email) {
        return Row.of(email);
    }

    public void eval() {
        this.emails.forEach(email -> collect(read(email)));
    }
}

Тем не менее, он возвращает ошибку ниже:

Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

Если я сделаю то, что предлагает и произнесу created_at в TIMESTAMP, я получу это вместо:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.

Я нашел здесь другие вопросы о переполнении стека, связанные с этими исключениями, но они касаются потоков и временных таблиц, и ни один из них не решает случай объединения потока в список c.

Любой идеи?

РЕДАКТИРОВАТЬ: Похоже, есть открытый вопрос в проекте Flink для моего случая использования: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

Итак, я Я также принимаю предложения обойти.

1 Ответ

0 голосов
/ 14 апреля 2020

Мне удалось реализовать обходной путь, который решил мою проблему!

Вместо того, чтобы объединять потоковые попытки со списком писем stati c, я заранее сопоставлял каждую попытку с новой * добавлением blockedEmail атрибут. Если список c stati

содержит текущий адрес электронной почты для попытки, для его атрибута blockedEmail устанавливается true.
DataStream<Attempt> attemptsStream = sourceApi.<Attempt>startStream().map(new MapFunction<Attempt, Attempt>() {
    @Override
    public Attempt map(Attempt attempt) throws Exception {
        if (blockedEmails.contains(attempt.getEmail())) {
            attempt.setBlockedEmail(true);
        }
        return attempt;
    }
});

Список c stati blockedEmails относится к типу HashSet, поэтому поиск будет O (1).

Наконец, запрос на группировку был скорректирован на: *1014*

override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "WHERE Attempts.email <> '' " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

Пока что проблема объединения потоков и данных c списки, кажется, еще не решены, но в моем случае вышеупомянутое обходное решение прекрасно его решило.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...