最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

performance - MLP Speed-Up in PySpark fluctuates with more cores – possible cache memory issue? - Stack Overflow

programmeradmin3浏览0评论

enter image description here

I have conducted experiments running the MLP (Multi-Layer Perceptron) algorithm on a PC cluster with Apache Spark, with configurations ranging from small data to large data. For small data, the graph does not fluctuate (perhaps because the data is small), but for larger data, the graph shows fluctuations. What could be the cause of this? It seems consistent that as the data size increases, the graph always appears to fluctuate like that

I just want to know the reason why the graph behaves like that, because the expected outcome is that the graph should always rise as the number of cores increases. I have also tried it on a mini-PC cluster with fewer cores, and the graph results are as follows

enter image description here

By the way, this is my code:

import time
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import MultilayerPerceptronClassifier

# 1. Spark Initialization
spark = SparkSession.builder \
    .appName("NeuralNetworkClassification - Training Only") \
    .getOrCreate()

# 2. Load Dataset
def load_spark_dataset(spark, file_path):
    # Load dataset from CSV
    return spark.read.csv(file_path, header=True, inferSchema=True)

# Load dataset
dataset_path = "/spark/spark-3.5.3-bin-hadoop3/data/generated_dataset_spark/small_data.csv"
dataset = load_spark_dataset(spark, dataset_path)

# 3. Preprocessing Data
def preprocess_data(df):
    feature_cols = [col for col in df.columns if col.startswith("feature_")]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vector")
    assembled_df = assembler.transform(df)

    scaler = StandardScaler(inputCol="features_vector", outputCol="features_scaled")
    scaler_model = scaler.fit(assembled_df)
    scaled_df = scaler_model.transform(assembled_df)
    return scaled_df.select("features_scaled", "label")

data_preprocessed = preprocess_data(dataset)

# 4. Neural Network Model
def train_nn(df, layers):
    # Neural Network Model
    nn = MultilayerPerceptronClassifier(layers=layers, featuresCol="features_scaled", labelCol="label", seed=42)

    # Measure training time
    start_time = time.time()

    # Train model
    model = nn.fit(df)

    # The Execution Time Calculation
    end_time = time.time()
    training_time = end_time - start_time
    print(f"Training Time: {training_time:.2f} seconds")
    return model

# Neural Network: 40 input nodes, 100 hidden nodes, 50 hidden nodes, 25 hidden nodes, 2 output nodes
layers = [20, 128, 64, 32, 2]

# 5. Train Model
model = train_nn(data_preprocessed, layers)

# Stop SparkSession
spark.stop()
发布评论

评论列表(0)

  1. 暂无评论