Добавление столбца занимает много времени в PySpark Dataframes - PullRequest
0 голосов
/ 06 июня 2019

В настоящее время я пытаюсь интегрировать PySpark и Cassandra, и у меня возникают проблемы с оптимизацией кода для его более быстрого выполнения.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import sum as _sum

def connect_cassandra():
    spark = SparkSession.builder \
      .appName('SparkCassandraApp') \
      .config('spark.cassandra.connection.host', 'localhost') \
      .config('spark.cassandra.connection.port', '9042') \
      .config('spark.cassandra.output.consistency.level','ONE') \
      .master('local[*]') \
      .getOrCreate()

    sqlContext = SQLContext(spark)
    return sqlContext

#--------THIS FUNCTION IS MY CONCERN ACTUALLY------------
def check_ip(ip, df):
    rows= df.filter("src_ip = '"+ip+"' or dst_ip = '"+ip+"'") \
            .agg(_sum('total').alias('data')) \
            .collect()

    print(rows[0][0])
#-----------------------------------------------------------

def load_df(sqlContext):

    df = sqlContext \
      .read \
      .format('org.apache.spark.sql.cassandra') \
      .options(table='acrs_app_data_usage', keyspace='acrs') \
      .load()

    return df

if __name__ == '__main__':
    lists = ['10.8.25.6', '10.8.24.10', '10.8.24.11', '10.8.20.1', '10.8.25.15', '10.8.25.10']
    sqlContext = connect_cassandra()
    df = load_df(sqlContext)
    for ip in lists:
        check_ip(ip, df)

Функция check_ip() здесь принимает ip и предварительно загруженный фрейм данных, фрейм данных имеет 3 столбца (src_ip, dst_ip and total) и около 250 тыс. Строк в качестве аргумента, а затем перебирает их по общему столбцу, добавляя их и возвращая суммированные данные, сгруппированные по предоставленному IP.

Но когда я выполняю сценарий, для возврата суммированной суммы требуется не менее секунды на каждый IP-адрес. И у меня более 32 тысяч IP-адресов, для которых должно произойти то же самое. И количество времени много.

Любая помощь будет оценена. Заранее спасибо.

1 Ответ

1 голос
/ 06 июня 2019

Краткий ответ: не используйте петли.

Возможное решение:

  • Конвертировать lists в формат данных.
  • Внутреннее соединение lists_df дважды с вашим фреймом данных, сначала на ip == src_ip, а второе на ip == dst_ip
  • Объединить оба с unionAll
  • Наконец, используйте groupBy("ip").agg(_sum("total"))

Используются соединения. Так что, возможно, есть даже лучшее решение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...