November 15, 2011

Hadoop and CDR Processing

Wanted to try out hadoop. Since i work in telco domain, CDR Processing was a perfect fit. It sets perfectly well for the definition of "Big Data" - Volume, Velocity and Variety. Following are the steps followed for a small prototype.

- 2 Laptops(nodes) with SSH enabled and password-less logins
- Installed release 0.20.2
-  Follow the single node setup and make sure hadoop works on each of the laptop. Run the WordCount / Grep examples to verify

Initial Steps (Multi Node)

- Now go in for the actual fun - multi node setup
- Choose one of the node to be the primary Name Node (for HDFS) and Job Tracker node (for MR jobs) 
- Enter the Primary node`s IP in conf/core-site.XML for the property "fs.default.name" (on both the nodes)
- Enter the Primary node`s IP in conf/mapred-site.XML for the property "mapred.job.tracker" (on both the nodes)
- Enter the Primary node`s IP in conf/masters (only on the primary node)

- Enter both the nodes IPs in conf/slaves (only on the primary node). Both the nodes will act as Data(HDFS) and Task (MR) nodes

 - Set the value of property "dfs.replication" to 2 (since we have 2 nodes, its better to replicate the HDFS blocks) in conf/hdfs-site.xml  (on both the nodes )
- Now we can start the hadoop cluster using bin/start-all.sh. The output is shown below

$ bin/start-all.sh
starting namenode, logging to /rnd/hadoop-0.20.2/bin/../logs/hadoop-h
padmin-namenode-primary.out
192.168.1.100: starting datanode, logging to /rnd/hadoop-0.20.2/bin/.
./logs/hadoop-hpadmin-datanode-primary.out
192.168.1.102: starting datanode, logging to /rnd/hadoop-0.20.2/bin/.
./logs/hadoop-hpadmin-datanode-slave1.out
192.168.1.100: starting secondarynamenode, logging to /rnd/hadoop-0.2
0.2/bin/../logs/hadoop-hpadmin-secondarynamenode-primary.out
starting jobtracker, logging to /rnd/hadoop-0.20.2/bin/../logs/hadoop
-hpadmin-jobtracker-primary.out
192.168.1.100: starting tasktracker, logging to /rnd/hadoop-0.20.2/bi
n/../logs/hadoop-hpadmin-tasktracker-primary.out
192.168.1.102: starting tasktracker, logging to /rnd/hadoop-0.20.2/bi
n/../logs/hadoop-hpadmin-tasktracker-slave1.out

- Verify the setup using the examples - WordCount and Grep

CDR Processing

Now coming to CDR processing, a typical CDR may contain lot of fields for a call/transaction. But for this exercise, I just went with the simple format.

SourceTN|startTimeOftheCall|EndTimeOfTheCall|DestinationTN (time is epoch format-secs)


Hadoop MR will be used to calculate the total call duration for a given SourceTN.

Sample CDR file
666555555|30|45|8885555555
666555566|40|75|8885555555
666555555|10|45|8885555555
666555555|30|45|8885555555
666555577|30|95|8885555555
666555555|20|45|8885555555
666555555|30|45|8885555555
666555566|00|45|8885555555
666555577|30|45|8885555555


 - Took the example WordCount code as base and modified it as below for the Map and Reduce part (CDR.java)
   private IntWritable diff = new IntWritable(1);
    private Text tn = new Text();
     
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString(),"|");
      System.out.println(key + "-" + value);
      if (itr.countTokens() < 3) return;
      String tnTmp=itr.nextToken();
      String start = itr.nextToken();
      String end = itr.nextToken();
      int diffTmp = Integer.parseInt(end) - Integer.parseInt(start);
      System.out.println(tnTmp+"-"+diffTmp);
      tn.set(tnTmp);
      diff.set(diffTmp);
      context.write(tn,diff);
    }
  }
  


- Modify ExampleDriver.java to include an entry for the above CDR job (Later will try to run it directly)
       pgd.addClass("cdr", CDR.class,
                   "A map/reduce program that calculates duration from CDR files .");

- Build the examples - using "ant examples" from the hadoop root folder

- Ensure the same hadoop-core jar is present on both the nodes, else "version mismatch" errors will occur, when the tasks are submitted to the task nodes

- Create several CDR files having several records with data as in above format ( I just copied the same file multiple times with different names, ten times)


- Create an input dir on HDFS using bin/hadoop dfs  -mkdir input

- Copy the above CDR files to the HDFS input dir using bin/hadoop dfs -copyFromLocal input/

- Create an output dir on HDFS using bin/hadoop dfs  -mkdir output

- Ensure the cluster is running, if not start the hadoop cluster - bin/hadoop start-all.sh

- Submit the CDR processing job - bin/hadoop jar  build/hadoop-0.20.3-dev-examples.jar cdr input output

Now the cluster should process the files and submit the records in each of the file to the two task nodes for MR processing. Log Output looks like below

$ bin/hadoop jar build/hadoop-0.20.3-dev-examples.jar cdr input output
11/11/16 00:15:52 INFO input.FileInputFormat: Total input paths to process : 10
11/11/16 00:15:53 INFO mapred.JobClient: Running job: job_201111160001_0005
11/11/16 00:15:54 INFO mapred.JobClient:  map 0% reduce 0%
11/11/16 00:16:01 INFO mapred.JobClient:  map 20% reduce 0%
11/11/16 00:16:04 INFO mapred.JobClient:  map 40% reduce 0%
11/11/16 00:16:07 INFO mapred.JobClient:  map 80% reduce 0%
11/11/16 00:16:10 INFO mapred.JobClient:  map 100% reduce 0%
11/11/16 00:16:13 INFO mapred.JobClient:  map 100% reduce 26%
11/11/16 00:16:19 INFO mapred.JobClient:  map 100% reduce 100%
11/11/16 00:16:22 INFO mapred.JobClient: Job complete: job_201111160001_0005
11/11/16 00:16:22 INFO mapred.JobClient: Counters: 17
11/11/16 00:16:22 INFO mapred.JobClient:   Job Counters
11/11/16 00:16:22 INFO mapred.JobClient:     Launched reduce tasks=1
11/11/16 00:16:22 INFO mapred.JobClient:     Launched map tasks=10
11/11/16 00:16:22 INFO mapred.JobClient:     Data-local map tasks=10
11/11/16 00:16:22 INFO mapred.JobClient:   FileSystemCounters
11/11/16 00:16:22 INFO mapred.JobClient:     FILE_BYTES_READ=648
11/11/16 00:16:22 INFO mapred.JobClient:     HDFS_BYTES_READ=2440
11/11/16 00:16:22 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1508
11/11/16 00:16:22 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=43
11/11/16 00:16:22 INFO mapred.JobClient:   Map-Reduce Framework
11/11/16 00:16:22 INFO mapred.JobClient:     Reduce input groups=3
11/11/16 00:16:22 INFO mapred.JobClient:     Combine output records=30
11/11/16 00:16:22 INFO mapred.JobClient:     Map input records=90
11/11/16 00:16:22 INFO mapred.JobClient:     Reduce shuffle bytes=486
11/11/16 00:16:22 INFO mapred.JobClient:     Reduce output records=3
11/11/16 00:16:22 INFO mapred.JobClient:     Spilled Records=60
11/11/16 00:16:22 INFO mapred.JobClient:     Map output bytes=1260
11/11/16 00:16:22 INFO mapred.JobClient:     Combine input records=90
11/11/16 00:16:22 INFO mapred.JobClient:     Map output records=90
11/11/16 00:16:22 INFO mapred.JobClient:     Reduce input records=30


Delete the output directory before running again - bin/hadoop dfs -rmr output

Screenshots

Following are some screenshots of the various GUI consoles showing the job data

On Master Node - http://localhost:50030 (MR admin)

Job Record



 Job Detail


Map Tasks (10 of them)


Reduce Task 



 On Name Node - http://localhost:50070
 
 Name Node - Browse HDFS

input/output dirs

input DIR


Output DIR

Final Result(reduce output)