最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Map Reduce Program Error for Top-K Structure - Stack Overflow

programmeradmin3浏览0评论

I am getting an error in my maven based map-reduce program, such that I am not able to receive anything in my reducer, which has only one instance for the top-k structure. The print statement in mapper works great, but nothing is being printed from the reducer.

Driver

package it.my.bigdata.hadoop.lab;

import .apache.hadoop.conf.Configuration;
import .apache.hadoop.conf.Configured;
import .apache.hadoop.fs.Path;
import .apache.hadoop.io.IntWritable;
import .apache.hadoop.io.NullWritable;
import .apache.hadoop.io.Text;
import .apache.hadoop.mapreduce.Job;
import .apache.hadoop.mapreduce.lib.input.FileInputFormat;
import .apache.hadoop.mapreduce.lib.input.TextInputFormat;
import .apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import .apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import .apache.hadoop.util.Tool;
import .apache.hadoop.util.ToolRunner;
import .apache.hadoop.fs.FileSystem;
import .apache.hadoop.mapreduce.Counter;

/**
 * MapReduce program
 */
public class DriverBigData extends Configured implements Tool {

  @Override
  public int run(String[] args) throws Exception {


    int exitCode = 0;  

     
    //Change the following part of the code 
    
    Path inputPath;
    Path outputDir;
    int numberOfReducers;

    
    // Parse the parameters
    numberOfReducers = 1;
    inputPath = new Path(args[0]);
    outputDir = new Path(args[1]);
    
    
    Configuration conf = this.getConf();


    // Define a new job
    Job job = Job.getInstance(conf); 
    FileSystem fs = FileSystem.get(conf);
    // Delete the output folder if it exists
    if (fs.exists(outputDir)) {
      fs.delete(outputDir, true);  // true to delete recursively
      System.out.println("Deleted existing output directory: " + outputDir);
    }

    // Assign a name to the job
    job.setJobName("Lab - Skeleton");
    
    // Set path of the input file/folder (if it is a folder, the job reads all the files in the specified folder) for this job
    FileInputFormat.addInputPath(job, inputPath);
    
    // Set path of the output folder for this job
    FileOutputFormat.setOutputPath(job, outputDir);
    
    // Specify the class of the Driver for this job
    job.setJarByClass(DriverBigData.class);
    
    // Set job input format
    job.setInputFormatClass(TextInputFormat.class);

    // Set job output format
    job.setOutputFormatClass(TextOutputFormat.class);
       
    // Set map class
    job.setMapperClass(MapperBigData.class);
    
    // Set map output key and value classes
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    
    // Set reduce class
    job.setReducerClass(ReducerBigData.class);
        
    // Set reduce output key and value classes
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // Set number of reducers
    job.setNumReduceTasks(numberOfReducers);
   
    
    // Execute the job and wait for completion
    if (job.waitForCompletion(true)==true)
        exitCode=0;
    else
        exitCode=1;

        
    return exitCode;
    
  }
  

  /** Main of the driver
   */
  
  public static void main(String args[]) throws Exception {
    // Exploit the ToolRunner class to "configure" and run the Hadoop application
    int res = ToolRunner.run(new Configuration(), new DriverBigData(), args);

    System.exit(res);
  }
}

Mapper

package it.my.bigdata.hadoop.lab;

import java.io.IOException;
import java.util.*;

import .apache.hadoop.io.*;
import .apache.hadoop.mapreduce.*;

import com.googlemon.collect.Comparators;

/**
 * Lab  - Mapper
 */

/* Set the proper data types for the (key,value) pairs */
class MapperBigData extends Mapper<
                    LongWritable, // Input key type
                    Text,         // Input value type
                    Text,         // Output key type
                    Text> {// Output value type
    
    private int k = 5;
    private PriorityQueue<Pair> localTopk;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        localTopk = new PriorityQueue<>(k, new Comparator<Pair>() {
            @Override
            public int compare(Pair p1, Pair p2) {
                // Compare based on the count value, this will sort the Pair in ascending order
                return Integerpare(p1.count, p2.count); // For min-heap behavior
            }
        });
        


        }

    protected void map(
            LongWritable key,   // Input key type
            Text value,         // Input value type
            Context context) throws IOException, InterruptedException {

            String[] parts = value.toString().split("\t");
            String word = parts[0].trim();
            int count = Integer.parseInt(parts[1].trim());
            // System.out.println(word + ": " + count);


            localTopk.offer(new Pair(word, count));


            if (localTopk.size() > k) {
                localTopk.poll();
            }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException  {
        // for (Pair pair : localTopk){
        //    
        // }
        for (Pair pair : localTopk){
            context.write(new Text("1"), new Text(pair.word + "\t" + pair.count));
            System.out.println(pair.word + "->  " + pair.count);
        }
    }


    // Helper class to store word and its count
    public static class Pair {
        String word;
        int count;

        Pair(String word, int count) {
            this.word = word;
            this.count = count;
        }
    }
}

Reducer

package it.my.bigdata.hadoop.lab;
import .apache.hadoop.io.*;
import .apache.hadoop.mapreduce.*;

import java.io.IOException;
import java.util.*;
/**
 * Lab - Reducer
 */

/* Set the proper data types for the (key,value) pairs */
class ReducerBigData extends Reducer<
                Text,           // Input key type
                Text,    // Input value type
                Text,           // Output key type
                IntWritable> {  // Output value type
                    private int K = 10; // Define top K
                    private PriorityQueue<Pair> globalTopK;  // Min-heap for global top K
                
                    @Override
                    protected void setup(Context context) throws IOException, InterruptedException {
                        // Initialize the priority queue (min-heap) for top K with a custom comparator
                        globalTopK = new PriorityQueue<>(K, new Comparator<Pair>() {
                            @Override
                            public int compare(Pair p1, Pair p2) {
                                // Min-heap, compare based on the count field
                                return Integerpare(p1.count, p2.count);
                            }
                        });
                    }
                
                    @Override
                    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                        // Merge all values from all mappers (local top K) and compute the global top K
                        System.out.println("Receiving in reducer. ");
                        for (Text value : values) {
                            
                            String[] parts = value.toString().split("\t");
                            String word = parts[0].trim();
                            int count = Integer.parseInt(parts[1].trim());
                            System.out.println(word + ": " + count);
                            // Add the (word, count) pair to the global top K
                            globalTopK.offer(new Pair(word, count));
                
                            // If the heap exceeds K size, remove the smallest element
                            if (globalTopK.size() > K) {
                                globalTopK.poll();
                            }
                        }
                    }

                    @Override
                    protected void cleanup(Context context) throws IOException, InterruptedException {
                        // Emit the global top K results
                        List<Pair> result = new ArrayList<>();
                        while (!globalTopK.isEmpty()) {
                            result.add(globalTopK.poll());
                        }
                
                        // Sort in descending order (because the min-heap gives the smallest element first)
                        Collections.reverse(result);
                
                        // Write the top K results
                        for (Pair pair : result) {
                            context.write(new Text(pair.word), new IntWritable(pair.count));
                        }
                    }





                    // Helper class to store word and its count
    public static class Pair {
        String word;
        int count;

        Pair(String word, int count) {
            this.word = word;
            this.count = count;
        }
    }
}

I have tried everything I could, changing key to nullwritable, or to 1 (constant). but I see nothing in reducer.

发布评论

评论列表(0)

  1. 暂无评论