Kafka Streams не отправляет записи в темы вывода - PullRequest
0 голосов
/ 26 апреля 2018

У меня есть приложение Kafka Streams, которое получает записи во входную тему, выполняет некоторую потоковую обработку и отправляет обработанные записи в несколько выходных тем.Он работал отлично, пока я его не остановил.

После остановки первого приложения Streams я создал другое приложение потоков, которое имеет другую тему ввода и вывода.

Затем я запустил оба приложения имое второе приложение работало отлично, но мое первое приложение вообще перестало выполнять какую-либо обработку.

Когда я запускаю консоль-потребителя по темам ввода, я вижу производимые записи, но когда я запускаю консоль-потребителя по темам выводаЯ не получаю никаких записей.Я не знаю, что могло пойти не так.Я думал, что это может быть из-за приложения второго потока.Поэтому я остановил его, воссоздал все темы и снова перезапустил первое приложение, но оно по-прежнему ничего не делает.

Примечание: я делаю это на локальном сервере.

Далее я создаю потокиприложение на моем персональном компьютере и работает так, как ожидалось.

Что могло пойти не так, что я вижу это неожиданно странное поведение?

Вот мой код:

Основной класс:

public class Pipe {

static Logger log = Logger.getLogger(Pipe.class.getName());

public static void main(String[] args) throws Exception {

    PropertyConfigurator.configure("log4j.properties");

    log.info("Starting application");

    Map<String, String> env = System.getenv();
    Properties props = new Properties();
    String BROKER_URL = env.get("BROKER_URL");


    String appId = "98aff1c5-7a69-46b7-899c-186851054b43";
    String appSecret = "zVyS/V694ffWe99QpCvYqE1sqeqLo36uuvTL8gmZV0A=";
    String appTenant = "2f6cb1a6-ecb8-4578-b680-bf84ded07ff4";

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // pass from env localhost:9092 | BROKER_URL + ":9092"
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final StreamsBuilder builder = new StreamsBuilder();


    log.info("Creating stream: o365_storage");
    KStream<String, String> source_o365_storage = builder.stream("o365_storage");

    log.info("Creating stream: scan_result_dlp");
    KStream<String, String> source_scan_result_dlp = builder.stream("scan_result_dlp");

    log.info("Creating stream: scan_result_malware");
    KStream<String, String> source_scan_result_malware = builder.stream("scan_result_malware");

    log.info("Creating stream: source_o365_user_contenturl");
    //KStream<String, String> source_o365_contenturl = builder.stream("o365_activity_contenturl");
    KStream<String, String> source_o365_user_contenturl = builder.stream("o365_activity_contenturl");

    log.info("Creating stream: source_o365_contenturl_result");
    KStream<String, String> source_o365_contenturl_result = source_o365_user_contenturl.flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String value) {
            ArrayList<String> keywords = new ArrayList<String>();
            ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
            try {
                String accessToken = O365Util.getAccessToken(appId, appSecret, appTenant);

                System.out.println("accessToken : " + accessToken);

                System.out.println("Creating futures..");
                List<Future<?>> futures = new ArrayList<Future<?>>();
                JSONArray contentUrlList = new JSONArray(value);
                for (int i = 0; i < contentUrlList.length(); i++) {
                    JSONObject contentUri = contentUrlList.getJSONObject(i);
                    //futures.add(executor.submit(new FetchLogService(accessToken, contentUri.getString("contentUri"))));
                    futures.add(executor.submit(new FetchLogService(accessToken, contentUri, appTenant)));
                }

                System.out.println("futures size is : " +  futures.size());
                for (Future<?> f : futures) {
                    if (f.get() != null) {
                        //System.out.println("Executing contentUri parallel....................... ");
                        String futureResult = f.get().toString();
                        if (String.valueOf(futureResult.charAt(0)).equalsIgnoreCase("[")) {
                            //System.out.println("futureResult is JSONArray");
                            JSONArray logList =  new JSONArray(futureResult);
                            for (int k = 0; k < logList.length(); k++) {
                                JSONObject log = logList.getJSONObject(k);
                                //System.out.println("Added logs into Events for action : " + log.getString("Operation"));
                                keywords.add(log.toString());
                            }
                        } else {
                            System.out.println("futureResult is JSONObject");
                            JSONObject contentUrlObj = new JSONObject(futureResult);
                            keywords.add(contentUrlObj.toString());
                        }
                    } else {
                        System.out.println("future result is nullllllllllllllllllllllllllllllllllllllll");
                    }
                }


            } catch (Exception e) {
                System.err.println("Unable to convert to json");
                e.printStackTrace();
            } finally {
                executor.shutdownNow();
            }

            return keywords;
        }
    });

    log.info("Creating stream: source_o365_user_activity_intermediate");
    KStream<String, String> source_o365_user_activity_intermediate = source_o365_contenturl_result.flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String value) {
            ArrayList<String> keywords = new ArrayList<String>();
            try {
                if (value.contains("Operation\":")) {
                    keywords.add(value);
                }
            } catch (Exception e) {
                System.err.println("Unable to convert to json");
                e.printStackTrace();
            }

            return keywords;
        }
    });

    source_o365_user_activity_intermediate.to("o365_user_activity");

    log.info("Creating stream: o365_contenturls");
    KStream<String, String> o365_contenturls = source_o365_contenturl_result.flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String value) {
            ArrayList<String> keywords = new ArrayList<String>();
            try {
                if (value.contains("contentUri\":")) {
                    keywords.add("["+value+"]");
                }
            } catch (Exception e) {
                System.err.println("Unable to convert to json");
                e.printStackTrace();
            }

            return keywords;
        }
    });

    o365_contenturls.to("o365_activity_contenturl");


    log.info("Creating stream: o365_user_activity");
    KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");

    log.info("Creating branch: branches_source_o365_user_activity");
    @SuppressWarnings("unchecked")
    KStream<String, String>[] branches_source_o365_user_activity = source_o365_user_activity.branch(
            (key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),        // Sharing Set by Date
            (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")), // Added to secure link
            (key, value) -> (value.contains("Operation\":\"AddedToGroup")),                                             // Added to group
            (key, value) -> (value.contains("Operation\":\"Add member to role.") || value.contains("Operation\":\"Remove member from role.")),//Role update by date
            (key, value) -> (value.contains("Operation\":\"FileUploaded") || value.contains("Operation\":\"FileDeleted")
                    || value.contains("Operation\":\"FileRenamed") || value.contains("Operation\":\"FileMoved")),       // Upload file by date
            (key, value) -> (value.contains("Operation\":\"UserLoggedIn")),                                             // User logged in by date
            (key, value) -> (value.contains("Operation\":\"Delete user.") || value.contains("Operation\":\"Add user.")
                    && value.contains("ResultStatus\":\"success"))                                                      // Manage user by date
            );

    log.info("Creating branch: branches1_source_o365_user_activity");
    @SuppressWarnings("unchecked")
    KStream<String, String>[] branches1_source_o365_user_activity = source_o365_user_activity.branch(
            (key, value) -> (value.contains("Operation\":\"FileUploaded") || value.contains("Operation\":\"FileModified") 
                    || value.contains("Operation\":\"FileDeleted")),                                                    // File update by date
            (key, value) -> (value.contains("Operation\":\"FileAccessed"))                                              // File access by date
            );

    log.info("Creating branch: branches2_source_o365_user_activity");
    @SuppressWarnings("unchecked")
    KStream<String, String>[] branches2_source_o365_user_activity = source_o365_user_activity.branch(
            (key, value) -> (value.contains("Operation\":\"FileUploaded") || value.contains("Operation\":\"FileModified")
                    || value.contains("Operation\":\"FileDeleted") || value.contains("Operation\":\"SharingSet") 
                    && value.contains("ItemType\":\"File"))                                                             // File operation by date
            );

    log.info("Creating branch: branches3_source_o365_user_activity");
    @SuppressWarnings("unchecked")
    KStream<String, String>[] branches3_source_o365_user_activity = source_o365_user_activity.branch(
            (key, value) -> (value.contains("Workload\":\"AzureActiveDirectory") || value.contains("Workload\":\"OneDrive") || value.contains("Workload\":\"SharePoint")) // Activity log by date
            );

    log.info("Creating branch: branches4_source_o365_user_activity");
    @SuppressWarnings("unchecked")
    KStream<String, String>[] branches4_source_o365_user_activity = source_o365_user_activity.branch(
            (key, value) -> (value.contains("Operation\":\"FileUploaded") || value.contains("Operation\":\"FileModified"))  // Download file for scanning
            );

    /////////////////////////////////=========================  DLP LOGS ========================/////////////////////////////////////////////////////////      
    AppUtil.pushToTopic(source_scan_result_dlp, Constant.O365_GTB_BREACHED_POLICY_BY_DATE, "o365_gtb_dlp_breached_policy_by_date");

    //////////////////////////////////////==================== MALWARE LOGS ================================////////////////////////////////////////////
    AppUtil.pushToTopic(source_scan_result_malware, Constant.O365_LAST_LINE_MALWARE, "o365_last_line_malware");

    //////////////////////////////////////==================== ALL LOGS ====================================////////////////////////////////////////////
    AppUtil.pushToTopic(source_o365_user_activity, Constant.O365_USER_ACTIVITY_BY_DATE, "o365_user_activity_by_date");

    ////////////////////////////////////====================== STORAGE LOGS ====================================////////////////////////////////////////////
    AppUtil.pushToTopic(source_o365_storage, Constant.O365_STORAGE_BY_DATE, "o365_storage_by_date");

    //////////////////////////////////////==================== BRANCH LOGS ====================================////////////////////////////////////////////
    AppUtil.pushToTopic(branches_source_o365_user_activity[0], Constant.O365_SHARING_SET_BY_DATE, "o365_sharing_set_by_date", Constant.O365_SHARING_SET_BY_DATE_EXCEP_KEYS);
    AppUtil.pushToTopic(branches_source_o365_user_activity[1], Constant.O365_ADDED_TO_SECURE_LINK_BY_DATE, "o365_added_to_secure_link_by_date");
    AppUtil.pushToTopic(branches_source_o365_user_activity[2], Constant.O365_ADDED_TO_GROUP_BY_DATE, "o365_added_to_group_by_date");
    AppUtil.pushToTopic(branches_source_o365_user_activity[3], Constant.O365_ROLE_UPDATE_BY_DATE, "o365_role_update_by_date");
    AppUtil.pushToTopic(branches_source_o365_user_activity[4], Constant.O365_UPLOAD_FILE_BY_DATE, "o365_upload_file_by_date", Constant.O365_UPLOAD_FILE_BY_DATE_EXCEP_KEYS);
    AppUtil.pushToTopic(branches_source_o365_user_activity[5], Constant.O365_USER_LOGGED_IN_BY_DATE, "o365_user_logged_in_by_date");
    AppUtil.pushToTopic(branches_source_o365_user_activity[6], Constant.O365_MANAGE_USER_BY_DATE, "o365_manage_user_by_date");

    ////////////////////////////////////====================== BRANCH 1 LOGS ====================================////////////////////////////////////////////
    AppUtil.pushToTopic(branches1_source_o365_user_activity[0], Constant.O365_FILE_UPDATE_BY_DATE, "o365_file_update_by_date");
    AppUtil.pushToTopic(branches1_source_o365_user_activity[1], Constant.O365_FILE_ACCESS_BY_DATE, "o365_file_access_by_date");

    ////////////////////////////////////====================== BRANCH 2 LOGS ====================================////////////////////////////////////////////
    AppUtil.pushToTopic(branches2_source_o365_user_activity[0], Constant.O365_FILE_OPERATION_BY_DATE, "o365_file_operation_by_date");

    ////////////////////////////////////====================== BRANCH 3 LOGS ====================================////////////////////////////////////////////
    AppUtil.pushToTopic(branches3_source_o365_user_activity[0], Constant.O365_ACTIVITY_LOG_BY_DATE, "o365_activity_log_by_date");

    ////////////////////////////////////====================== BRANCH 4 LOGS ====================================////////////////////////////////////////////
    branches4_source_o365_user_activity[0].to("download_file_for_scanning");



    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            log.trace("Exiting application.");
            streams.close();
            latch.countDown();
        }
    });

    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}
}

AppUtil:

public final class AppUtil {

static Logger log = Logger.getLogger(Pipe.class.getName());

public static HashMap createHashMap(String[] keys, String[] values) {
    HashMap<String, String> hmap = new HashMap<String, String>();
    for (int i = 0; i < values.length; i++) {
        hmap.put(keys[i], values[i]);
    }

    return hmap;
}

public static void pushToTopic(KStream<String, String> sourceTopic, HashMap<String, String> hmap, String destTopicName) {
    log.info(destTopicName+ " inside function");
    System.out.println(destTopicName + " inside function");

    sourceTopic.flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String value) {

            log.info("================================================================================================================================================================================");
            log.info("========> " + destTopicName + " Log:\n \n" + value);
            System.out.println("================================================================================================================================================================================");
            System.out.println("========> " + destTopicName + " Log:\n \n" + value);
            ArrayList<String> keywords = new ArrayList<String>();
            try {
                JSONObject send = new JSONObject();
                JSONObject received = processJSON(new JSONObject(value), destTopicName);


                send.put("current_date", getCurrentDate().toString());
                if (!destTopicName.equals("o365_storage_by_date")) {
                    send.put("insertion_time", getCurrentDateTime().toString());
                }
                boolean valid_json = true;
                for(String key: hmap.keySet()) {
                    if (received.has(hmap.get(key))) {
                        send.put(key, received.get(hmap.get(key)));
                    }
                    else {
                        System.out.println("\n \n Missing Key in JSON: Cannot send log to destination topic = " + destTopicName + " | " + hmap.get(key) + " Key is missing.");
                        log.error("\n \n Missing Key in JSON: Cannot send log to destination topic = " + destTopicName + " | " + hmap.get(key) + " Key is missing.");
                        valid_json = false;
                    }
                }   
                if (valid_json) {
                    keywords.add(send.toString());  
                }
                // apply regex to value and for each match add it to keywords

            } catch (Exception e) {
                // TODO: handle exception
                log.error("Unable to convert to json");
                System.err.println("Unable to convert to json");
                e.printStackTrace();
            }

            return keywords;
        }
    }).to(destTopicName);
}
//////////////////////////////////////

public static void pushToTopic(KStream<String, String> sourceTopic, HashMap<String, String> hmap, String destTopicName, String[] exceptionalKeys) {

    sourceTopic.flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String value) {

            log.info("================================================================================================================================================================================");
            log.info("========> " + destTopicName + " Log:\n \n" + value);
            System.out.println("================================================================================================================================================================================");
            System.out.println("========> " + destTopicName + " Log:\n \n" + value);
            ArrayList<String> keywords = new ArrayList<String>();
            try {
                JSONObject send = new JSONObject();
                JSONObject received = processJSON(new JSONObject(value), destTopicName);


                send.put("current_date", getCurrentDate().toString());
                if (!destTopicName.equals("o365_storage_by_date")) {
                    send.put("insertion_time", getCurrentDateTime().toString());
                }
                boolean valid_json = true;
                for(String key: hmap.keySet()) {
                    if (received.has(hmap.get(key))) {
                        send.put(key, received.get(hmap.get(key)));
                    }
                    else {
                        System.out.println("\n \n Missing Key in JSON: Sending log to destination topic = " + destTopicName + " with null value | " + hmap.get(key) + " Key is missing.");
                        log.warn("\n \n Missing Key in JSON: Sending log to destination topic = " + destTopicName + " with null value | " + hmap.get(key) + " Key is missing.");

                        if(!isExceptionalKey(exceptionalKeys, hmap.get(key))) {
                            valid_json = false;
                        }
                    }
                }
                if (valid_json) {
                    keywords.add(send.toString());  
                }
                // apply regex to value and for each match add it to keywords

            } catch (Exception e) {
                // TODO: handle exception
                log.error("Unable to convert to json");
                System.err.println("Unable to convert to json");
                e.printStackTrace();
            }

            return keywords;
        }


    }).to(destTopicName);
}
//////////////////////////////////////
private static boolean isExceptionalKey(String[] exceptionalKeys, String currKey) {
    // TODO Auto-generated method stub
    boolean isExceptionalKey = false;

    for (String string : exceptionalKeys) {
        if (string.equals(currKey)) {
            isExceptionalKey = true;
            break;
        }
    }
    return isExceptionalKey;
}
public static JSONObject processJSON(JSONObject jsonObj, String destTopicName) {
    if (jsonObj.has("UserId")) {
        String val = jsonObj.get("UserId").toString().toLowerCase();
        jsonObj.remove("UserId");
        jsonObj.put("UserId", val);
    }
    if (jsonObj.has("TargetUserOrGroupName")) {
        String val = jsonObj.get("TargetUserOrGroupName").toString().toLowerCase();
        jsonObj.remove("TargetUserOrGroupName");
        jsonObj.put("TargetUserOrGroupName", val);
    }
    if (jsonObj.has("ObjectId")) {
        String val = jsonObj.get("ObjectId").toString().toLowerCase();
        jsonObj.remove("ObjectId");
        jsonObj.put("ObjectId", val);
    }
    if (jsonObj.has("EventData")) {
        String val = jsonObj.get("EventData").toString().toLowerCase();
        jsonObj.remove("EventData");
        jsonObj.put("EventData", val);
    }
    if (destTopicName.equals("o365_last_line_malware")) {
        jsonObj.put("MaliciousScore", "-1");
    }
    if (destTopicName.equals("o365_activity_log_by_date") || destTopicName.equals("o365_gtb_dlp_breached_policy_by_date")) {
        jsonObj.put("ActivityDetail", jsonObj.toString());
    }
    return jsonObj;
}
public static String getCurrentDate() {

    Date date = new Date();
    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
    String UTCdate = dateFormat.format(date);
    return UTCdate;
}

private static String getCurrentDateTime() {

    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date date = new Date();
    String datetime = dateFormat.format(date);
    return datetime;
}

}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...