最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 'timestamp' d

programmeradmin4浏览0评论

Spark doesn't find one column while njust prints it in the schema:

import .apache.spark.sql.Dataset;
import .apache.spark.sql.Row;
import .apache.spark.sql.SparkSession;
import .jfree.chart.ChartPanel;
import .jfree.chart.JFreeChart;
import .jfree.chart.axis.AxisLocation;
import .jfree.chart.axis.LogarithmicAxis;
import .jfree.chart.axis.NumberAxis;
import .jfree.chart.axis.NumberTickUnit;
import .jfree.chart.labels.StandardXYToolTipGenerator;
import .jfree.chart.plot.XYPlot;
import .jfree.chart.renderer.PaintScale;
import .jfree.chart.renderer.xy.XYBlockRenderer;
import .jfree.chart.title.PaintScaleLegend;
import .jfree.chart.ui.RectangleEdge;
import .jfree.chart.ui.RectangleInsets;
import .jfree.data.xy.DefaultXYZDataset;
import .jfree.data.xy.XYZDataset;

import javax.imageio.ImageIO;
import javax.swing.*;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.*;

import static .apache.spark.sql.functions.*;

public class GraphDrawer3 {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Graph Drawer")
                .master("local[*]")
                .getOrCreate();

        String logFilePath = "D:\\filtered_logs_with_useragent";
        Dataset<Row> logs = spark.read()
                .option("delimiter", "\t")
                .option("header", "true")
                .csv(logFilePath);

// Checking coulmn name
        logs.printSchema();
        logs.show(5, false);

        logs = logs//.
                 //withColumn("timestamp_long", col("timestamp").cast("long")) 
                .withColumn("event_time", from_unixtime(col("timestamp"))) //<-- fail here
                .withColumn("time_window", unix_timestamp(window(col("event_time"), "1 minutes").getField("start")))
                .withColumn("duration", col("duration").cast("double"))
                .withColumn("out_bytes", col("out_bytes").cast("double"))
                .na().fill(0, new String[]{"duration"})
                .withColumn("duration_bucket", floor(col("out_bytes").divide(1000)).multiply(1000));

        Dataset<Row> graphData = logs
                .groupBy("time_window", "duration_bucket")
                .agg(count("*").alias("request_count"))
                .orderBy("time_window", "duration_bucket");

        long count = graphData.count();
        if (count == 0) {
            System.out.println("❌ No data!");
            return;
        }

        graphData.show(50, false);

        DefaultXYZDataset dataset = new DefaultXYZDataset();
        double[] xValues, yValues, zValues;
        double maxRequests;

        Object[] dataArrays = prepareData(graphData);
        xValues = (double[]) dataArrays[0];
        yValues = (double[]) dataArrays[1];
        zValues = (double[]) dataArrays[2];
        maxRequests = (double) dataArrays[3];

        dataset.addSeries("Heatmap Data", new double[][]{xValues, yValues, zValues});

        JFreeChart chart = createChart(dataset, xValues, yValues, maxRequests);
        displayChart(chart);
        saveChartAsPNG(chart, "D:\\output\\graph_03_03_2025_dalvik.png");

        spark.stop();
    }

    private static Object[] prepareData(Dataset<Row> graphData) {
        List<Row> rows = graphData.collectAsList();

        Set<Double> uniqueX = new TreeSet<>();
        Set<Double> uniqueY = new TreeSet<>();

        for (Row row : rows) {
            uniqueX.add(((Long) row.getAs("time_window")).doubleValue());
            uniqueY.add(((Long) row.getAs("duration_bucket")).doubleValue());
        }

        int xSize = uniqueX.size();
        int ySize = uniqueY.size();
        int totalSize = xSize * ySize;

        double[] xvalues = new double[totalSize];
        double[] yvalues = new double[totalSize];
        double[] zvalues = new double[totalSize];

        int index = 0;
        for (double x : uniqueX) {
            for (double y : uniqueY) {
                xvalues[index] = x;
                yvalues[index] = y;
                zvalues[index] = 0;
                index++;
            }
        }

        for (Row row : rows) {
            double x = ((Long) row.getAs("time_window")).doubleValue();
            double y = ((Long) row.getAs("duration_bucket")).doubleValue();
            double z = ((Long) row.getAs("request_count")).doubleValue();

            int idx = (new ArrayList<>(uniqueX)).indexOf(x) * ySize + (new ArrayList<>(uniqueY)).indexOf(y);
            zvalues[idx] = z;
        }

        double maxRequests = Arrays.stream(zvalues).max().orElse(1);
        return new Object[]{xvalues, yvalues, zvalues, maxRequests};
    }

    private static JFreeChart createChart(XYZDataset dataset, double[] xValues, double[] yValues, double maxRequests) {
        NumberAxis xAxis = new NumberAxis("Timestamp (30-min Windows)");
        xAxis.setAutoRangeIncludesZero(false);
        //xAxis.setDateFormatOverride(new SimpleDateFormat("yyyy-MM-dd HH:mm"));

        NumberAxis yAxis = new NumberAxis("Duration (sec)");
        yAxis.setAutoRangeIncludesZero(false);
        yAxis.setTickUnit(new NumberTickUnit(10));

        XYPlot plot = new XYPlot(dataset, xAxis, yAxis, null);
        XYBlockRenderer renderer = new XYBlockRenderer();

        double xMin = Arrays.stream(xValues).min().orElse(0);
        double xMax = Arrays.stream(xValues).max().orElse(1);
        double yMin = Arrays.stream(yValues).min().orElse(0);
        double yMax = Arrays.stream(yValues).max().orElse(1);

        long uniqueXCount = Arrays.stream(xValues).distinct().count();
        long uniqueYCount = Arrays.stream(yValues).distinct().count();

        double blockWidth = (xMax - xMin) / uniqueXCount;
        double blockHeight = (yMax - yMin) / uniqueYCount;

        renderer.setBlockWidth(blockWidth);
        renderer.setBlockHeight(blockHeight);
        renderer.setDefaultToolTipGenerator(new StandardXYToolTipGenerator());
        renderer.setPaintScale(new SpectrumPaintScale(1, maxRequests)); 
        plot.setRenderer(renderer);
        JFreeChart chart = new JFreeChart("Heatmap: Requests by Time and Duration",
                JFreeChart.DEFAULT_TITLE_FONT, plot, false);

        LogarithmicAxis zAxis = new LogarithmicAxis("Request Count");
        zAxis.setAutoRangeIncludesZero(false);
        zAxis.setAllowNegativesFlag(false); 
        zAxis.setLowerBound(1); 
        zAxis.setUpperBound(maxRequests);

        PaintScaleLegend legend = new PaintScaleLegend(new SpectrumPaintScale(1, maxRequests), zAxis);
        legend.setSubdivisionCount(128);
        legend.setAxisLocation(AxisLocation.TOP_OR_RIGHT);
        legend.setPadding(new RectangleInsets(25, 10, 50, 10));
        legend.setStripWidth(20);
        legend.setPosition(RectangleEdge.RIGHT);
        legend.setBackgroundPaint(Color.WHITE);
        chart.addSubtitle(legend);

        return chart;
    }


    private static void displayChart(JFreeChart chart) {
        JFrame frame = new JFrame("Heatmap Visualization");
        frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        ChartPanel chartPanel = new ChartPanel(chart);
        chartPanel.setPreferredSize(new Dimension(1400, 800));
        chartPanel.setMouseZoomable(true, false);
        frame.add(chartPanel);
        frame.pack();
        frame.setLocationRelativeTo(null);
        frame.setVisible(true);
    }

    private static void saveChartAsPNG(JFreeChart chart, String filePath) {
        try {
            File file = new File(filePath);
            ImageIO.write(chart.createBufferedImage(1200, 600), "png", file);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class SpectrumPaintScale implements PaintScale {
        private static final float H1 = 0.7f;  
        private static final float H2 = 0.0f;  
        private final double lowerBound;
        private final double upperBound;
        private final double logLower;
        private final double logUpper;

        public SpectrumPaintScale(double lowerBound, double upperBound) {
            this.lowerBound = lowerBound;
            this.upperBound = upperBound;

            this.logLower = Math.log10(Math.max(lowerBound, 1));
            this.logUpper = Math.log10(Math.max(upperBound, 1));
        }

        @Override
        public double getLowerBound() {
            return lowerBound;
        }

        @Override
        public double getUpperBound() {
            return upperBound;
        }

        @Override
        public Paint getPaint(double value) {
            if (value <= lowerBound) {
                return Color.getHSBColor(H1, 1f, 1f); 
            }
            if (value >= upperBound) {
                return Color.getHSBColor(H2, 1f, 1f); 
            }

            double logValue = Math.log10(Math.max(value, 1));
            float scaledValue = (float) ((logValue - logLower) / (logUpper - logLower));

            float scaledH = H1 + scaledValue * (H2 - H1);
            return Color.getHSBColor(scaledH, 1f, 1f);
        }
    }
}

and that was the output to the logs before the fail:

25/03/21 14:14:46 INFO Executor: Finished task 0.0 in stage 2.0 (TID 79). 2007 bytes result sent to driver
25/03/21 14:14:46 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 79) in 32 ms on ILYA.mshome (executor driver) (1/1)
25/03/21 14:14:46 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
25/03/21 14:14:46 INFO DAGScheduler: ResultStage 2 (show at GraphDrawer3.java:52) finished in 0.042 s
25/03/21 14:14:46 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/03/21 14:14:46 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
25/03/21 14:14:46 INFO DAGScheduler: Job 2 finished: show at GraphDrawer3.java:52, took 0.045052 s
25/03/21 14:14:46 INFO CodeGenerator: Code generated in 9.4934 ms
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field                                                    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1741031986.377,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=300 HTTP/1.1,200,25713,26186,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c,          |
|1741031986.404,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79077289012d0f384d3e14982adfcd7286073cfa9.jpg?w=300 HTTP/1.1,200,30450,30923,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c,          |
|1741031986.418,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=534 HTTP/1.1,200,31499,31973,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
|1741031986.663,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/792693224cbf7cacd86b0408285b116b674fb674d.jpg?w=300 HTTP/1.1,200,45336,45809,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c,          |
|1741031986.787,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79163443747900bf957fd420fa10b1b447010b4a5.jpg?w=598 HTTP/1.1,200,59851,60325,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Exception in thread "main" .apache.spark.sql.AnalysisException: Column 'timestamp' does not exist. Did you mean one of the following? [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field];
'Project [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17, from_unixtime('timestamp, yyyy-MM-dd HH:mm:ss, Some(Europe/Moscow)) AS event_time#26]
+- Relation [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17] csv

    at .apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:200)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:193)
    at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
    at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
    at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
    at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
    at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:193)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:193)
    at scala.collection.immutable.Stream.foreach(Stream.scala:533)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:193)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:102)
    at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:102)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:97)
    at .apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
    at .apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
    at .apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at .apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
    at .apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
    at .apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at .apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at .apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at .apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
    at .apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
    at .apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
    at .apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
    at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at .apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
    at .apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
    at .apache.spark.sql.Dataset.select(Dataset.scala:1519)
    at .apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
    at .apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
    at cdnloganalysis.GraphDrawer3.main(GraphDrawer3.java:57)

Spark doesn't find one column while njust prints it in the schema:

import .apache.spark.sql.Dataset;
import .apache.spark.sql.Row;
import .apache.spark.sql.SparkSession;
import .jfree.chart.ChartPanel;
import .jfree.chart.JFreeChart;
import .jfree.chart.axis.AxisLocation;
import .jfree.chart.axis.LogarithmicAxis;
import .jfree.chart.axis.NumberAxis;
import .jfree.chart.axis.NumberTickUnit;
import .jfree.chart.labels.StandardXYToolTipGenerator;
import .jfree.chart.plot.XYPlot;
import .jfree.chart.renderer.PaintScale;
import .jfree.chart.renderer.xy.XYBlockRenderer;
import .jfree.chart.title.PaintScaleLegend;
import .jfree.chart.ui.RectangleEdge;
import .jfree.chart.ui.RectangleInsets;
import .jfree.data.xy.DefaultXYZDataset;
import .jfree.data.xy.XYZDataset;

import javax.imageio.ImageIO;
import javax.swing.*;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.*;

import static .apache.spark.sql.functions.*;

public class GraphDrawer3 {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Graph Drawer")
                .master("local[*]")
                .getOrCreate();

        String logFilePath = "D:\\filtered_logs_with_useragent";
        Dataset<Row> logs = spark.read()
                .option("delimiter", "\t")
                .option("header", "true")
                .csv(logFilePath);

// Checking coulmn name
        logs.printSchema();
        logs.show(5, false);

        logs = logs//.
                 //withColumn("timestamp_long", col("timestamp").cast("long")) 
                .withColumn("event_time", from_unixtime(col("timestamp"))) //<-- fail here
                .withColumn("time_window", unix_timestamp(window(col("event_time"), "1 minutes").getField("start")))
                .withColumn("duration", col("duration").cast("double"))
                .withColumn("out_bytes", col("out_bytes").cast("double"))
                .na().fill(0, new String[]{"duration"})
                .withColumn("duration_bucket", floor(col("out_bytes").divide(1000)).multiply(1000));

        Dataset<Row> graphData = logs
                .groupBy("time_window", "duration_bucket")
                .agg(count("*").alias("request_count"))
                .orderBy("time_window", "duration_bucket");

        long count = graphData.count();
        if (count == 0) {
            System.out.println("❌ No data!");
            return;
        }

        graphData.show(50, false);

        DefaultXYZDataset dataset = new DefaultXYZDataset();
        double[] xValues, yValues, zValues;
        double maxRequests;

        Object[] dataArrays = prepareData(graphData);
        xValues = (double[]) dataArrays[0];
        yValues = (double[]) dataArrays[1];
        zValues = (double[]) dataArrays[2];
        maxRequests = (double) dataArrays[3];

        dataset.addSeries("Heatmap Data", new double[][]{xValues, yValues, zValues});

        JFreeChart chart = createChart(dataset, xValues, yValues, maxRequests);
        displayChart(chart);
        saveChartAsPNG(chart, "D:\\output\\graph_03_03_2025_dalvik.png");

        spark.stop();
    }

    private static Object[] prepareData(Dataset<Row> graphData) {
        List<Row> rows = graphData.collectAsList();

        Set<Double> uniqueX = new TreeSet<>();
        Set<Double> uniqueY = new TreeSet<>();

        for (Row row : rows) {
            uniqueX.add(((Long) row.getAs("time_window")).doubleValue());
            uniqueY.add(((Long) row.getAs("duration_bucket")).doubleValue());
        }

        int xSize = uniqueX.size();
        int ySize = uniqueY.size();
        int totalSize = xSize * ySize;

        double[] xvalues = new double[totalSize];
        double[] yvalues = new double[totalSize];
        double[] zvalues = new double[totalSize];

        int index = 0;
        for (double x : uniqueX) {
            for (double y : uniqueY) {
                xvalues[index] = x;
                yvalues[index] = y;
                zvalues[index] = 0;
                index++;
            }
        }

        for (Row row : rows) {
            double x = ((Long) row.getAs("time_window")).doubleValue();
            double y = ((Long) row.getAs("duration_bucket")).doubleValue();
            double z = ((Long) row.getAs("request_count")).doubleValue();

            int idx = (new ArrayList<>(uniqueX)).indexOf(x) * ySize + (new ArrayList<>(uniqueY)).indexOf(y);
            zvalues[idx] = z;
        }

        double maxRequests = Arrays.stream(zvalues).max().orElse(1);
        return new Object[]{xvalues, yvalues, zvalues, maxRequests};
    }

    private static JFreeChart createChart(XYZDataset dataset, double[] xValues, double[] yValues, double maxRequests) {
        NumberAxis xAxis = new NumberAxis("Timestamp (30-min Windows)");
        xAxis.setAutoRangeIncludesZero(false);
        //xAxis.setDateFormatOverride(new SimpleDateFormat("yyyy-MM-dd HH:mm"));

        NumberAxis yAxis = new NumberAxis("Duration (sec)");
        yAxis.setAutoRangeIncludesZero(false);
        yAxis.setTickUnit(new NumberTickUnit(10));

        XYPlot plot = new XYPlot(dataset, xAxis, yAxis, null);
        XYBlockRenderer renderer = new XYBlockRenderer();

        double xMin = Arrays.stream(xValues).min().orElse(0);
        double xMax = Arrays.stream(xValues).max().orElse(1);
        double yMin = Arrays.stream(yValues).min().orElse(0);
        double yMax = Arrays.stream(yValues).max().orElse(1);

        long uniqueXCount = Arrays.stream(xValues).distinct().count();
        long uniqueYCount = Arrays.stream(yValues).distinct().count();

        double blockWidth = (xMax - xMin) / uniqueXCount;
        double blockHeight = (yMax - yMin) / uniqueYCount;

        renderer.setBlockWidth(blockWidth);
        renderer.setBlockHeight(blockHeight);
        renderer.setDefaultToolTipGenerator(new StandardXYToolTipGenerator());
        renderer.setPaintScale(new SpectrumPaintScale(1, maxRequests)); 
        plot.setRenderer(renderer);
        JFreeChart chart = new JFreeChart("Heatmap: Requests by Time and Duration",
                JFreeChart.DEFAULT_TITLE_FONT, plot, false);

        LogarithmicAxis zAxis = new LogarithmicAxis("Request Count");
        zAxis.setAutoRangeIncludesZero(false);
        zAxis.setAllowNegativesFlag(false); 
        zAxis.setLowerBound(1); 
        zAxis.setUpperBound(maxRequests);

        PaintScaleLegend legend = new PaintScaleLegend(new SpectrumPaintScale(1, maxRequests), zAxis);
        legend.setSubdivisionCount(128);
        legend.setAxisLocation(AxisLocation.TOP_OR_RIGHT);
        legend.setPadding(new RectangleInsets(25, 10, 50, 10));
        legend.setStripWidth(20);
        legend.setPosition(RectangleEdge.RIGHT);
        legend.setBackgroundPaint(Color.WHITE);
        chart.addSubtitle(legend);

        return chart;
    }


    private static void displayChart(JFreeChart chart) {
        JFrame frame = new JFrame("Heatmap Visualization");
        frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        ChartPanel chartPanel = new ChartPanel(chart);
        chartPanel.setPreferredSize(new Dimension(1400, 800));
        chartPanel.setMouseZoomable(true, false);
        frame.add(chartPanel);
        frame.pack();
        frame.setLocationRelativeTo(null);
        frame.setVisible(true);
    }

    private static void saveChartAsPNG(JFreeChart chart, String filePath) {
        try {
            File file = new File(filePath);
            ImageIO.write(chart.createBufferedImage(1200, 600), "png", file);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class SpectrumPaintScale implements PaintScale {
        private static final float H1 = 0.7f;  
        private static final float H2 = 0.0f;  
        private final double lowerBound;
        private final double upperBound;
        private final double logLower;
        private final double logUpper;

        public SpectrumPaintScale(double lowerBound, double upperBound) {
            this.lowerBound = lowerBound;
            this.upperBound = upperBound;

            this.logLower = Math.log10(Math.max(lowerBound, 1));
            this.logUpper = Math.log10(Math.max(upperBound, 1));
        }

        @Override
        public double getLowerBound() {
            return lowerBound;
        }

        @Override
        public double getUpperBound() {
            return upperBound;
        }

        @Override
        public Paint getPaint(double value) {
            if (value <= lowerBound) {
                return Color.getHSBColor(H1, 1f, 1f); 
            }
            if (value >= upperBound) {
                return Color.getHSBColor(H2, 1f, 1f); 
            }

            double logValue = Math.log10(Math.max(value, 1));
            float scaledValue = (float) ((logValue - logLower) / (logUpper - logLower));

            float scaledH = H1 + scaledValue * (H2 - H1);
            return Color.getHSBColor(scaledH, 1f, 1f);
        }
    }
}

and that was the output to the logs before the fail:

25/03/21 14:14:46 INFO Executor: Finished task 0.0 in stage 2.0 (TID 79). 2007 bytes result sent to driver
25/03/21 14:14:46 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 79) in 32 ms on ILYA.mshome (executor driver) (1/1)
25/03/21 14:14:46 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
25/03/21 14:14:46 INFO DAGScheduler: ResultStage 2 (show at GraphDrawer3.java:52) finished in 0.042 s
25/03/21 14:14:46 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/03/21 14:14:46 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
25/03/21 14:14:46 INFO DAGScheduler: Job 2 finished: show at GraphDrawer3.java:52, took 0.045052 s
25/03/21 14:14:46 INFO CodeGenerator: Code generated in 9.4934 ms
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field                                                    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1741031986.377,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=300 HTTP/1.1,200,25713,26186,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c,          |
|1741031986.404,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79077289012d0f384d3e14982adfcd7286073cfa9.jpg?w=300 HTTP/1.1,200,30450,30923,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c,          |
|1741031986.418,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/7907728753176af1c64284d1c873a838a4a93b071.jpg?w=534 HTTP/1.1,200,31499,31973,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
|1741031986.663,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/792693224cbf7cacd86b0408285b116b674fb674d.jpg?w=300 HTTP/1.1,200,45336,45809,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0,-,-,RU,static,HIT,n_c,          |
|1741031986.787,185.41.120.58,-,2025-03-03 19:59:46+0000,GET /aa/79163443747900bf957fd420fa10b1b447010b4a5.jpg?w=598 HTTP/1.1,200,59851,60325,-,Dalvik/2.1.0 (Linux; U; Android 14; 23090RA98G Build/UP1A.231005.007),-,cache-limeshop.cdnvideo.ru,468,0.004,0.004,200,RU,static,HIT,n_c,|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Exception in thread "main" .apache.spark.sql.AnalysisException: Column 'timestamp' does not exist. Did you mean one of the following? [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field];
'Project [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17, from_unixtime('timestamp, yyyy-MM-dd HH:mm:ss, Some(Europe/Moscow)) AS event_time#26]
+- Relation [timestamp,remote_addr,remote_user,time_local,request,status,body_bytes_sent,out_bytes,referrer,useragent,http_x_forwarded_for,host,torso_id,duration,upstream_response_time,upstream_status,country,service,cache_status,logtype,custom_field#17] csv

    at .apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:200)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:193)
    at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
    at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
    at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
    at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
    at .apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:193)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:193)
    at scala.collection.immutable.Stream.foreach(Stream.scala:533)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:193)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:102)
    at .apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:102)
    at .apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:97)
    at .apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
    at .apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
    at .apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at .apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
    at .apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
    at .apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at .apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at .apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at .apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at .apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
    at .apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
    at .apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
    at .apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
    at .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at .apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
    at .apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
    at .apache.spark.sql.Dataset.select(Dataset.scala:1519)
    at .apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
    at .apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
    at cdnloganalysis.GraphDrawer3.main(GraphDrawer3.java:57)
Share Improve this question asked Mar 21 at 18:00 EljahEljah 5,2557 gold badges60 silver badges109 bronze badges 1
  • All the awt, swing and jfree code is not needed to reproduce the issue. I recommend to replace for an minimal reproducible example. – aled Commented Mar 21 at 19:39
Add a comment  | 

1 Answer 1

Reset to default 1

The wring place here was the forcing the delimeter.

.option("delimiter", "\t")

led to timestamp column of wrong type

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论