Counting Distinct IPs in WebLog Data MapReduce Case Study

MapReduce program to count distinct IPs in weblog data. We will demonstrate the application of a combiner to optimize data transfer overhead between the map and reduce stages.


The dataset :


The code is implemented in a generic fashion and can be used to count distinct values in any tab-delimited dataset.


In general, the program consists of three classes:


DistinctMapper.java the mapper.
DistinctReducer.java the reducer.
§            DistinctCounterJob.java the driver. Some configurations (input type, output type, job…) are done here
 
 

Here is the source code of

DistinctMapper.java



public static class DistinctMapper
extends Mapper<LongWritable, Text, Text, IntWritable>
{
private static int col_pos;
private static final Pattern pattern = Pattern.
compile("t");
private Text outKey = new Text();
private static final IntWritable outValue = new
IntWritable(1);
@Override
protected void setup(Context context
) throws IOException, InterruptedException {
col_pos = context.getConfiguration().
getInt(DistinctCounterJob.COL_POS, 0);
}
@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException,
InterruptedException {
String field = pattern.split(value.toString())[col_
pos];
outKey.set(field);
context.write(outKey, outValue);
}
}

DistinctReducer.java

public static class DistinctReducer
extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
@Override
protected void reduce(Text key, Iterable
values, Context context
) throws IOException, InterruptedException {
int total = 0;
for(IntWritable value: values) {
total += value.get();
}
count.set(total);
context.write(key, count);
}
}

DistinctCounterJob.java

package hadoopgyaan.mapreduce.distinctipadd;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.regex.Pattern;
public class DistinctCounterJob implements Tool {
private Configuration conf;
public static final String NAME = "distinct_counter";
public static final String COL_POS = "col_pos";
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new
DistinctCounterJob(), args);
}
The run() method is where we set the input/output formats, mapper class configuration, combiner class, and key/value class configuration:
public int run(String[] args) throws Exception {
if(args.length != 3) {
System.err.println("Usagetinct_counter <input&rt;
<output&rt; <element_position>");
System.exit(1);
}
conf.setInt(COL_POS, Integer.parseInt(args[2]));
Job job = new Job(conf, "Count distinct elements at
position");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
job.setCombinerClass(DistinctReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setJarByClass(DistinctCounterJob.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 1 : 0;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return conf;
}
}

First we set up DistinctCounterJob to implement a Tool interface for remote submission. The static constant NAME is of potential use in the Hadoop Driver class, which supports the launching of different jobs from the same JAR file. The static constant COL_POS is initialized to the third required argument from the command line . This value is set within the job configuration, and should match the position of the column you wish to count for each distinct entry. Supplying 4 will match the IP column for the weblog data. Since we are reading and writing text, we can use the supplied TextInputFormat and TextOutputFormat classes. We will set the Mapper and Reduce classes to match our DistinctMapper and DistinctReducer implemented classes respectively. We also supply DistinctReducer as a combiner class. This decision is explained in more detail as follows:

It’s also very important to call setJarByClass() so that the TaskTrackers can properly unpack and find the Mapper and Reducer classes. The job uses the static helper methods on FileInputFormat and FileOutputFormat to set the input and output directories respectively. Now we’re set up and ready to submit the job. The Mapper class sets up a few member variables as follows: f col_pos: This is initialized to a value supplied in the configuration. It allows users to change which column to parse and apply the count distinct operation on. f pattern: This defines the column’s split point for each row based on tabs. f outKey: This is a class member that holds output values. This avoids having to create a new instance for each output that is written. f outValue: This is an integer representing one occurrence of the given key. It is similar to the WordCount example. The map() function splits each incoming line’s value and extracts the string located at col_ pos. We reset the internal value for outKey to the string found on that line’s position. For our example, this will be the IP value for the row. We emit the value of the newly reset outKey variable along with the value of outValue to mark one occurrence of that given IP address. Without the assistance of the combiner, this would present the reducer with an iterable collection of 1s to be counted. The following is an example of a reducer {key, value:[]} without a combiner: {10.10.1.1, [1,1,1,1,1,1]} = six occurrences of the IP “10.10.1.1”.


Downloads:


Sample Input file (weblog_entries)


I hope this tutorial will surely help you. If you have any questions or problems please let me know.
Happy Hadooping with Patrick..


3 thoughts on “Counting Distinct IPs in WebLog Data MapReduce Case Study

  1. My brother recommended I might like this website. He was totally right. This post truly made my day. You cann’t imagine just how much time I had spent for this info! Thanks!

  2. It’s the best time to make some plans for the future and it is time to be happy. I have read this post and if I could I desire to suggest you few interesting things or advice. Perhaps you could write next articles referring to this article. I desire to read even more things about it!

Leave a Reply

Your email address will not be published. Required fields are marked *