I am getting an error in my maven based map-reduce program, such that I am not able to receive anything in my reducer, which has only one instance for the top-k structure. The print statement in mapper works great, but nothing is being printed from the reducer.
Driver
package it.my.bigdata.hadoop.lab;
import .apache.hadoop.conf.Configuration;
import .apache.hadoop.conf.Configured;
import .apache.hadoop.fs.Path;
import .apache.hadoop.io.IntWritable;
import .apache.hadoop.io.NullWritable;
import .apache.hadoop.io.Text;
import .apache.hadoop.mapreduce.Job;
import .apache.hadoop.mapreduce.lib.input.FileInputFormat;
import .apache.hadoop.mapreduce.lib.input.TextInputFormat;
import .apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import .apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import .apache.hadoop.util.Tool;
import .apache.hadoop.util.ToolRunner;
import .apache.hadoop.fs.FileSystem;
import .apache.hadoop.mapreduce.Counter;
/**
* MapReduce program
*/
public class DriverBigData extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
int exitCode = 0;
//Change the following part of the code
Path inputPath;
Path outputDir;
int numberOfReducers;
// Parse the parameters
numberOfReducers = 1;
inputPath = new Path(args[0]);
outputDir = new Path(args[1]);
Configuration conf = this.getConf();
// Define a new job
Job job = Job.getInstance(conf);
FileSystem fs = FileSystem.get(conf);
// Delete the output folder if it exists
if (fs.exists(outputDir)) {
fs.delete(outputDir, true); // true to delete recursively
System.out.println("Deleted existing output directory: " + outputDir);
}
// Assign a name to the job
job.setJobName("Lab - Skeleton");
// Set path of the input file/folder (if it is a folder, the job reads all the files in the specified folder) for this job
FileInputFormat.addInputPath(job, inputPath);
// Set path of the output folder for this job
FileOutputFormat.setOutputPath(job, outputDir);
// Specify the class of the Driver for this job
job.setJarByClass(DriverBigData.class);
// Set job input format
job.setInputFormatClass(TextInputFormat.class);
// Set job output format
job.setOutputFormatClass(TextOutputFormat.class);
// Set map class
job.setMapperClass(MapperBigData.class);
// Set map output key and value classes
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// Set reduce class
job.setReducerClass(ReducerBigData.class);
// Set reduce output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Set number of reducers
job.setNumReduceTasks(numberOfReducers);
// Execute the job and wait for completion
if (job.waitForCompletion(true)==true)
exitCode=0;
else
exitCode=1;
return exitCode;
}
/** Main of the driver
*/
public static void main(String args[]) throws Exception {
// Exploit the ToolRunner class to "configure" and run the Hadoop application
int res = ToolRunner.run(new Configuration(), new DriverBigData(), args);
System.exit(res);
}
}
Mapper
package it.my.bigdata.hadoop.lab;
import java.io.IOException;
import java.util.*;
import .apache.hadoop.io.*;
import .apache.hadoop.mapreduce.*;
import com.googlemon.collect.Comparators;
/**
* Lab - Mapper
*/
/* Set the proper data types for the (key,value) pairs */
class MapperBigData extends Mapper<
LongWritable, // Input key type
Text, // Input value type
Text, // Output key type
Text> {// Output value type
private int k = 5;
private PriorityQueue<Pair> localTopk;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
localTopk = new PriorityQueue<>(k, new Comparator<Pair>() {
@Override
public int compare(Pair p1, Pair p2) {
// Compare based on the count value, this will sort the Pair in ascending order
return Integerpare(p1.count, p2.count); // For min-heap behavior
}
});
}
protected void map(
LongWritable key, // Input key type
Text value, // Input value type
Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split("\t");
String word = parts[0].trim();
int count = Integer.parseInt(parts[1].trim());
// System.out.println(word + ": " + count);
localTopk.offer(new Pair(word, count));
if (localTopk.size() > k) {
localTopk.poll();
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// for (Pair pair : localTopk){
//
// }
for (Pair pair : localTopk){
context.write(new Text("1"), new Text(pair.word + "\t" + pair.count));
System.out.println(pair.word + "-> " + pair.count);
}
}
// Helper class to store word and its count
public static class Pair {
String word;
int count;
Pair(String word, int count) {
this.word = word;
this.count = count;
}
}
}
Reducer
package it.my.bigdata.hadoop.lab;
import .apache.hadoop.io.*;
import .apache.hadoop.mapreduce.*;
import java.io.IOException;
import java.util.*;
/**
* Lab - Reducer
*/
/* Set the proper data types for the (key,value) pairs */
class ReducerBigData extends Reducer<
Text, // Input key type
Text, // Input value type
Text, // Output key type
IntWritable> { // Output value type
private int K = 10; // Define top K
private PriorityQueue<Pair> globalTopK; // Min-heap for global top K
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// Initialize the priority queue (min-heap) for top K with a custom comparator
globalTopK = new PriorityQueue<>(K, new Comparator<Pair>() {
@Override
public int compare(Pair p1, Pair p2) {
// Min-heap, compare based on the count field
return Integerpare(p1.count, p2.count);
}
});
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// Merge all values from all mappers (local top K) and compute the global top K
System.out.println("Receiving in reducer. ");
for (Text value : values) {
String[] parts = value.toString().split("\t");
String word = parts[0].trim();
int count = Integer.parseInt(parts[1].trim());
System.out.println(word + ": " + count);
// Add the (word, count) pair to the global top K
globalTopK.offer(new Pair(word, count));
// If the heap exceeds K size, remove the smallest element
if (globalTopK.size() > K) {
globalTopK.poll();
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// Emit the global top K results
List<Pair> result = new ArrayList<>();
while (!globalTopK.isEmpty()) {
result.add(globalTopK.poll());
}
// Sort in descending order (because the min-heap gives the smallest element first)
Collections.reverse(result);
// Write the top K results
for (Pair pair : result) {
context.write(new Text(pair.word), new IntWritable(pair.count));
}
}
// Helper class to store word and its count
public static class Pair {
String word;
int count;
Pair(String word, int count) {
this.word = word;
this.count = count;
}
}
}
I have tried everything I could, changing key to nullwritable, or to 1 (constant). but I see nothing in reducer.