KSQL: Могу ли я использовать потоки в функциях UDF KSQL для ускорения процесса? - PullRequest
1 голос
/ 10 апреля 2019

Я использую независимый ksql-server в 3-узлах, разговаривая с Kafka кластером из 3-х узлов. Создан Stream из Topic с 15 разделами и данные в потоке для некоторого обогащения. Получил фрагмент кода как UDF для поиска в файле IP2Location.bin и класс UDF выглядит следующим образом:

import java.io.IOException;
import java.util.Map;

import com.google.gson.Gson;

import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {

    private IP2Location loc = null;
    private Gson gson = null;

    @Udf(description = "fetches the geoloc of the ipaddress.")
    public synchronized String ip2lookup(String ip) {

        String json = null;
        if (loc != null) {
            IP2LocationResult result = null;
            try {
                result = loc.query(ip);
                System.out.println(result);
                json = gson.toJson(result);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return json;
        }
        return ip;
    }

    @Override
    public void configure(Map<String, ?> arg0) {

        try {
            String db_path = null;
            String os = System.getProperty("os.name").toLowerCase();

            db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";

            loc = new IP2Location(db_path);
            gson = new Gson();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Данные в Topic и в Stream довольно быстрые (может быть миллион записей в секунду). При использовании метода synchronized скорость составляет 3000 записей / сообщений в секунду в каждом узле ksql-server. С этой скоростью вы знаете, сколько времени потребуется, чтобы наверстать упущенное. Без метода synchronized я вижу поврежденные данные, поскольку один объект / метод используется несколькими потоками или около того.

Вопрос1: Как именно вызов udf будет вызываться / вызываться KSQL?

Вопрос2: Могу ли я использовать потоки, обрабатывающие запросы в udf?

Вопрос 3: Поскольку тема / поток состоит из 15 разделов, я должен раскрутить 15 узлов ksql-servers?

Спасибо.

1 Ответ

1 голос
/ 10 апреля 2019

Вопрос1: Как именно вызов udf будет вызываться / вызываться KSQL?

Не уверен, что ты имеешь в виду. Как только ваш UDF сделан доступным для KSQL (см. https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),, вы можете вызывать UDF в ваших операторах KSQL как IP2LOOKUP. Вы также можете запустить SHOW FUNCTIONS в KSQL, чтобы подтвердить, что ваш UDF доступен для использования.

Возможно, вы спрашиваете из-за вашего следующего вопроса? KSQL будет вызывать ваш UDF по одному сообщению за раз.

Вопрос2: Могу ли я использовать потоки, обрабатывающие запросы в udf?

Почему вы хотите это сделать? Вы обеспокоены тем, что KSQL с вашим текущим кодом UDF не сможет обрабатывать объем входящих данных? Говоря о том, что является ожидаемым объемом данных, который вы пытаетесь обработать, потому что, возможно, вы пытаетесь провести преждевременную оптимизацию?

Кроме того, не зная больше подробностей, я не думаю, что многопоточная установка для вашей UDF даст какие-либо преимущества, потому что UDF, при вызове, будет по-прежнему обрабатывать только одно сообщение за раз (для сервера KSQL или более точно, для каждой задачи потока, которых может быть много для каждого сервера KSQL, я упомяну это, чтобы прояснить, что пользовательские функции в KSQL не ограничивают вашу обработку, обрабатывая только одно сообщение на всех серверах; обработка, конечно, распределена и происходит параллельно).

Вопрос 3: Поскольку тема / поток состоит из 15 разделов, я должен развернуть 15 узлов ksql-серверов?

Это зависит от вашего объема данных. Вы можете вращать столько, сколько хотите серверов KSQL. Если объем данных невелик, может быть достаточно одного сервера KSQL. Если объем данных выше, вы можете запустить дополнительные серверы KSQL максимум до 15 серверов (поскольку в теме ввода 15 разделов). Любые дополнительные серверы KSQL будут работать вхолостую.

В сценарии, когда 15 серверов KSQL будет недостаточно, вы должны увеличить количество разделов для вашей входной темы с 15 до большего числа, а затем вы также можете запустить больше серверов KSQL (что, таким образом, увеличивает вычислительную мощность вашей настройки).

...