Архитектура развертывания для веб-приложения аналитики, использующего фреймворки django и spark - PullRequest
0 голосов
/ 25 июня 2019

Я занимаюсь разработкой аналитического веб-приложения, которое будет предоставлять функции обучения и тестирования моделей через пользовательский интерфейс. Для этого я использовал Django с Scikit Learn.

Джанго с искрой на локальной машине (windows)

Теперь я хочу сделать это в большом масштабе данных, используя искру. Использование django в качестве основы для обработки запросов и спарк для обработки и моделирования

Django + pyspark на локальном компьютере (windows) и спарк на удаленном кластере

Я настроил проект django и настроил спарк на кластере из двух машин linux вместе с hdfs.

Я предполагаю, что выгрузка / загрузка / потоковая передача данных в этот hdfs уже реализована.

Я пишу каждую модель как представление в проекте django, и реализация представления имеет код, написанный с использованием pyspark. Я использовал pyspark для создания соединения с установкой spark на кластере linux.

import pandas as pd
import numpy as np
import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def sample_model_code(trainData, trainDataFileType, trainDataDelimiter,
                      testData, testDataFileType, testDataDelimiter,
                      targetIndexTrainData, targetIndexTestData,
                      modelName):

    # trainData = "D:/training-data.csv"
    # trainDataFileType = "csv"
    # trainDataDelimiter = ","
    # testData = "D:/test-data.csv"
    # testDataFileType = "csv"
    # testDataDelimiter = ","
    # targetIndexTrainData = 44
    # targetIndexTestData = 44
    # modelName = "test_model"

    conf = SparkConf().setMaster("local").setAppName("creditDecisonApp2")
    sc = SparkContext(conf = conf)  
    spark = SparkSession(sc)

    # spark.conf.set("spark.sql.shuffle.partitions", "2")

    training_data_set = spark.read.option("inferSchema", "true").option("header", "true").csv(trainData)
    test_data_set = spark.read.option("inferSchema", "true").option("header", "true").csv(testData)

    train_data_column_names = training_data_set.columns
    train_data_target_variable = train_data_column_names[targetIndexTrainData] 

    test_data_column_names = test_data_set.columns
    test_data_target_variable = test_data_column_names[targetIndexTestData]

    train_data_numeric_cols = []
    train_data_categorical_cols = []
    test_data_numeric_cols = []
    test_data_categorical_cols = []

    for element in training_data_set.dtypes:

        if 'int' in element[1] or 'double' in element[1]:
            train_data_numeric_cols.append(element[0])       
        else:
            train_data_categorical_cols.append(element[0])

    for element in test_data_set.dtypes:

        if 'int' in element[1] or 'double' in element[1]:
            test_data_numeric_cols.append(element[0])       
        else:
            test_data_categorical_cols.append(element[0])

    stages_train = []
    stages_test = []

    for categoricalColumn in train_data_categorical_cols:
        if categoricalColumn != train_data_target_variable:
            stringIndexer = StringIndexer(inputCol = categoricalColumn, outputCol = categoricalColumn + 'Index')
            stages_train += [stringIndexer]

    label_stringIdx_train = StringIndexer(inputCol = train_data_target_variable, outputCol = 'label')
    stages_train += [label_stringIdx_train]

    assemblerInputsTrain = [c + "Index" for c in train_data_categorical_cols] + train_data_numeric_cols
    assemblerTrain = VectorAssembler(inputCols=assemblerInputsTrain, outputCol="features")
    stages_train += [assemblerTrain]

    for categoricalColumn in test_data_categorical_cols:
        if categoricalColumn != test_data_target_variable:
            stringIndexer = StringIndexer(inputCol = categoricalColumn, outputCol = categoricalColumn + 'Index')
            stages_test += [stringIndexer]

    label_stringIdx_test = StringIndexer(inputCol = test_data_target_variable, outputCol = 'label')
    stages_test += [label_stringIdx_test]

    assemblerInputsTest = [c + "Index" for c in test_data_categorical_cols] + test_data_numeric_cols
    assemblerTest = VectorAssembler(inputCols=assemblerInputsTest, outputCol="features")
    stages_test += [assemblerTest]

    pipeline_train = Pipeline(stages=stages_train)
    pipeline_test = Pipeline(stages=stages_test)

    pipeline_train_model = pipeline_train.fit(training_data_set)
    pipeline_test_model = pipeline_test.fit(test_data_set)

    train_df = pipeline_train_model.transform(training_data_set)
    test_df = pipeline_test_model.transform(test_data_set)

    dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 5)
    dtModel = dt.fit(train_df)
    predictions = dtModel.transform(test_df)

    #TODO: save the clf as a pickle object

    labelIndexer = StringIndexer().setInputCol(train_data_target_variable).setOutputCol("label").fit(training_data_set)
    category_preds_col_name = "Predicted_" + test_data_target_variable
    categoryConverter = IndexToString().setInputCol("prediction").setOutputCol(category_preds_col_name).setLabels(labelIndexer.labels)
    converted = categoryConverter.transform(predictions)

    result_df = converted.select(test_data_column_names + [category_preds_col_name])

    location_temp = workingDirectory
    result_file_name = location_temp +"/"+"credit_decision_predicted_data.csv"

    result_df.coalesce(1).write.format('com.databricks.spark.csv').save(result_file_name,header = 'true')


    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    eval_metrics_dict = dict()
    eval_metrics_dict["accuracy"] = evaluator.evaluate(converted)


    result_dict = dict()
    result_dict["resultFilePath"] = os.path.normpath(result_file_name).replace(os.sep, "/")
    result_dict["evaluationMetricsDetails"] = eval_metrics_dict

    return (result_dict)   

Приложение Django работает на локальной машине с Windows Spark настроен на локальной машине Windows и выше код работал при запуске с использованием Django

У меня вопрос, будет ли это работать, если есть Настройка Spark на удаленном Linux-кластере, django на локальной машине с Windows и передача пути к файлу hdfs вместо пути к файлу локальной файловой системы данных

Или есть какой-либо подход к настройке приложений для этого типа архитектуры

...