В настоящее время я пытаюсь интегрировать PySpark и Cassandra, и у меня возникают проблемы с оптимизацией кода для его более быстрого выполнения.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import sum as _sum
def connect_cassandra():
spark = SparkSession.builder \
.appName('SparkCassandraApp') \
.config('spark.cassandra.connection.host', 'localhost') \
.config('spark.cassandra.connection.port', '9042') \
.config('spark.cassandra.output.consistency.level','ONE') \
.master('local[*]') \
.getOrCreate()
sqlContext = SQLContext(spark)
return sqlContext
#--------THIS FUNCTION IS MY CONCERN ACTUALLY------------
def check_ip(ip, df):
rows= df.filter("src_ip = '"+ip+"' or dst_ip = '"+ip+"'") \
.agg(_sum('total').alias('data')) \
.collect()
print(rows[0][0])
#-----------------------------------------------------------
def load_df(sqlContext):
df = sqlContext \
.read \
.format('org.apache.spark.sql.cassandra') \
.options(table='acrs_app_data_usage', keyspace='acrs') \
.load()
return df
if __name__ == '__main__':
lists = ['10.8.25.6', '10.8.24.10', '10.8.24.11', '10.8.20.1', '10.8.25.15', '10.8.25.10']
sqlContext = connect_cassandra()
df = load_df(sqlContext)
for ip in lists:
check_ip(ip, df)
Функция check_ip()
здесь принимает ip и предварительно загруженный фрейм данных, фрейм данных имеет 3 столбца (src_ip, dst_ip and total
) и около 250 тыс. Строк в качестве аргумента, а затем перебирает их по общему столбцу, добавляя их и возвращая суммированные данные, сгруппированные по предоставленному IP.
Но когда я выполняю сценарий, для возврата суммированной суммы требуется не менее секунды на каждый IP-адрес. И у меня более 32 тысяч IP-адресов, для которых должно произойти то же самое. И количество времени много.
Любая помощь будет оценена. Заранее спасибо.