Как использовать Spark Streaming, чтобы прочитать поток и найти IP по временному окну? - PullRequest
7 голосов
/ 20 июня 2019

Я новичок в Apache Spark и хотел бы написать некоторый код на Python, используя PySpark для чтения потока и поиска IP-адресов.

У меня есть класс Java для генерации некоторых поддельных IP-адресов для последующей обработки. Этот класс будет указан здесь:

import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Random;

public class SocketNetworkTrafficSimulator {
    public static void main(String[] args) throws Exception {
        Random rn = new Random();
        ServerSocket welcomeSocket = new ServerSocket(9999);
        int[] possiblePortTypes = new int[]{21, 22, 80, 8080, 463};
        int numberOfRandomIps=100;
        String[] randomIps = new String[numberOfRandomIps];
        for (int i=0;i<numberOfRandomIps;i++)
            randomIps[i] = (rn.nextInt(250)+1) +"." +
                                (rn.nextInt(250)+1) +"." +
                                (rn.nextInt(250)+1) +"." +
                                (rn.nextInt(250)+1);
        System.err.println("Server started");
        while (true) {
            try {
                Socket connectionSocket = welcomeSocket.accept();
                System.err.println("Server accepted connection");
                DataOutputStream outToClient = new DataOutputStream(connectionSocket.getOutputStream());
                while (true) {
                    String str = "" + possiblePortTypes[rn.nextInt(possiblePortTypes.length)] + ","
                            + randomIps[rn.nextInt(numberOfRandomIps)] + ","
                            + randomIps[rn.nextInt(numberOfRandomIps)] + "\n";
                    outToClient.writeBytes(str);
                    Thread.sleep(10);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

На данный момент я реализовал следующую функцию только для подсчета слов, которую я запускаю с помощью следующей команды в Mac OsX spark-submit spark_streaming.py <host> <port> <folder_name> <file_name>. Мне удалось установить связь между ними и прослушивать сгенерированные IP-адреса. Теперь моя главная проблема - как отслеживать записи, которые я слушаю.

from __future__ import print_function

import os
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext


# Get or register a Broadcast variable
def getWordBlacklist(sparkContext):
    if ('wordBlacklist' not in globals()):
        globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
    return globals()['wordBlacklist']


# Get or register an Accumulator
def getDroppedWordsCounter(sparkContext):
    if ('droppedWordsCounter' not in globals()):
        globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
    return globals()['droppedWordsCounter']


def createContext(host, port, outputPath):
    # If you do not see this printed, that means the StreamingContext has been loaded
    # from the new checkpoint
    print("Creating new context")
    if os.path.exists(outputPath):
        os.remove(outputPath)
    sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    # Create a socket stream on target ip:port and count the
    # words in input stream of \n delimited text (eg. generated by 'nc')
    lines = ssc.socketTextStream(host, port)
    words = lines.flatMap(lambda line: line.split(" "))
    wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

    def echo(time, rdd):
        # Get or register the blacklist Broadcast
        blacklist = getWordBlacklist(rdd.context)
        # Get or register the droppedWordsCounter Accumulator
        droppedWordsCounter = getDroppedWordsCounter(rdd.context)

        # Use blacklist to drop words and use droppedWordsCounter to count them
        def filterFunc(wordCount):
            if wordCount[0] in blacklist.value:
                droppedWordsCounter.add(wordCount[1])
                return False
            else:
                return True

        counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
        print(counts)
        print("Dropped %d word(s) totally" % droppedWordsCounter.value)
        print("Appending to " + os.path.abspath(outputPath))
        # with open(outputPath, 'a') as f:
        #     f.write(counts + "\n")

    wordCounts.foreachRDD(echo)
    return ssc


if __name__ == "__main__":
    if len(sys.argv) != 5:
        print("Usage: recoverable_network_wordcount.py <hostname> <port> "
              "<checkpoint-directory> <output-file>", file=sys.stderr)
        sys.exit(-1)
    host, port, checkpoint, output = sys.argv[1:]
    ssc = StreamingContext.getOrCreate(checkpoint,
                                       lambda: createContext(host, int(port), output))
    ssc.start()
    ssc.awaitTermination()

В конце я хотел бы прочитать поток и найти IP-адреса для порта, которые отправляют или получают больше пакетов J за последние K секунд. J и K - это некоторые параметры, которые я определяю в мой код (например, J = 10, K = 60 и т. д.)

Ответы [ 2 ]

3 голосов
/ 26 июня 2019

Я решил мою проблему, используя этот метод:

def getFrequentIps(stream, time_window, min_packets):
    frequent_ips = (stream.flatMap(lambda line: format_stream(line))            
                    # Count the occurrences of a specific pair 
                    .countByValueAndWindow(time_window, time_window, 4)
                    # Filter above the threshold imposed by min_packets
                    .filter(lambda count: count[1] >= int(min_packets))
                    .transform(lambda record: record.sortBy(lambda x: x[1], ascending=False)))

    number_items = 20
    print("Every %s seconds the top-%s channles with more than %s packages will be showed: " %
          (time_window, number_items, min_packets))
    frequent_ips.pprint(number_items)

1 голос
/ 30 июня 2019

Как уже упоминалось в ответе, который вы предоставили, PySpark имеет встроенную функцию, которая делает именно то, что вы хотите, то есть подсчитывает значения в течение временного окна.

countByValueAndWindow(windowLength, slideInterval, [numTasks]) 

Как и в reduByKeyAndWindow, количество задач сокращения настраивается с помощью необязательного аргумента. Здесь вы можете найти больше примеров: PySpark Documentation

...