This post is in continuation of earlier posts related to CDR Processing.
http://ssklogs.blogspot.in/2011/11/hadoop-and-cdr-processing.html
http://ssklogs.blogspot.in/2012/01/hadoophbase-based-cdr-processing.html
So far we saw the processing of the CDRs using Hadoop/Hbase. What about the receiving part of the CDRs from various Network elements. There is no standard CDR format/delivery mechanism and each vendor follow their own.
To collect these CDRs, we need to follow the same bigData approach. Thats where flume comes into picture.
Using flume, we can setup a node(s) to collect the CDR from one or more NEs. Perform any transformations, like changing the format,adding/removing fields etc and store it into HDFS for further processing.
Flume has the Source-Decorator-Sink concepts. Source is the NE element which can FTP the files to the designated Agent Node`s FileSystem. The Decorator does the job of any transformations. The Sink writes to the HDFS. Flume comes with a good catalog of Source/Sinks/Decorators. We need to just write the decorator plugin for any custom transformation.
Lets the take CDR format used in previous posts. Assume we need to prepend the TNs with the Country Code before sending to Hadoop processing.
Here are the steps to use Flume for CDR Processing.
- Install Flume 0.9.4 binaries
- Download Flume 0.9.4 Source (to develop custom decorator plugins)
- Copy the hadoop core jar from the hadoop installation to the flume lib directory (remove the flume one)
- For this exercise, the flume master and node are running on the same machine as Namenode. But one can distribute these to different machines if needed
- Write the custom decorator plugin using the helloworld sample provided. In the append() method , get the event body and do the required transformations.
@Override
public void append(Event e) throws IOException, InterruptedException {
System.out.println("cdrDeco -> " + new String(e.getBody()));
StringTokenizer st = new StringTokenizer(new String(e.getBody()), "|", true);
StringBuffer newRec = new StringBuffer();
String fromTN = st.nextToken();
newRec.append(fromTN);
newRec.append(st.nextToken());
newRec.append(st.nextToken());
newRec.append(st.nextToken());
newRec.append(st.nextToken());
newRec.append(st.nextToken());
//Append Country code
String toTN = st.nextToken();
toTN = "+91" + toTN;
newRec.append(toTN);
EventImpl e2 = new EventImpl(newRec.toString().getBytes(),
e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
e.getAttrs());
super.append(e2);
}
- Build the plugin jar using 'mvn package'
- add the jar to FLUME_CLASSPATH (bin/flume file)
- add the plugin to conf/flume-site.xml
flume.plugin.classes
cdrDeco.CDRDeco
Comma separated list of plugin classes
- Start the master (bin/flume master)
- Access flume web console (flumeMaster:35871) and make sure the plugin shows up under "extn" tab
- Set the node`s source-decorator-sink configuration as below using "config" tab on master console
nodeHostName: tailDir( "/tmp/switch1" ) | { cdrDeco => escapedFormatDfs( "hdfs://nameNode:9000/user/cdr/input/switch1", "CDRRecs", raw() ) }
- Start the Node (bin/flume node)
- Now the node will poll the source directory and perform the transformation as mentioned in cdrDeco and sink it to the HDFS directory
http://ssklogs.blogspot.in/2011/11/hadoop-and-cdr-processing.html
http://ssklogs.blogspot.in/2012/01/hadoophbase-based-cdr-processing.html
So far we saw the processing of the CDRs using Hadoop/Hbase. What about the receiving part of the CDRs from various Network elements. There is no standard CDR format/delivery mechanism and each vendor follow their own.
To collect these CDRs, we need to follow the same bigData approach. Thats where flume comes into picture.
Using flume, we can setup a node(s) to collect the CDR from one or more NEs. Perform any transformations, like changing the format,adding/removing fields etc and store it into HDFS for further processing.
Flume has the Source-Decorator-Sink concepts. Source is the NE element which can FTP the files to the designated Agent Node`s FileSystem. The Decorator does the job of any transformations. The Sink writes to the HDFS. Flume comes with a good catalog of Source/Sinks/Decorators. We need to just write the decorator plugin for any custom transformation.
Lets the take CDR format used in previous posts. Assume we need to prepend the TNs with the Country Code before sending to Hadoop processing.
Here are the steps to use Flume for CDR Processing.
- Install Flume 0.9.4 binaries
- Download Flume 0.9.4 Source (to develop custom decorator plugins)
- Copy the hadoop core jar from the hadoop installation to the flume lib directory (remove the flume one)
- For this exercise, the flume master and node are running on the same machine as Namenode. But one can distribute these to different machines if needed
- Write the custom decorator plugin using the helloworld sample provided. In the append() method , get the event body and do the required transformations.
@Override
public void append(Event e) throws IOException, InterruptedException {
System.out.println("cdrDeco -> " + new String(e.getBody()));
StringTokenizer st = new StringTokenizer(new String(e.getBody()), "|", true);
StringBuffer newRec = new StringBuffer();
String fromTN = st.nextToken();
newRec.append(fromTN);
newRec.append(st.nextToken());
newRec.append(st.nextToken());
newRec.append(st.nextToken());
newRec.append(st.nextToken());
newRec.append(st.nextToken());
//Append Country code
String toTN = st.nextToken();
toTN = "+91" + toTN;
newRec.append(toTN);
EventImpl e2 = new EventImpl(newRec.toString().getBytes(),
e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
e.getAttrs());
super.append(e2);
}
- Build the plugin jar using 'mvn package'
- add the jar to FLUME_CLASSPATH (bin/flume file)
- add the plugin to conf/flume-site.xml
- Start the master (bin/flume master)
- Access flume web console (flumeMaster:35871) and make sure the plugin shows up under "extn" tab
- Set the node`s source-decorator-sink configuration as below using "config" tab on master console
nodeHostName: tailDir( "/tmp/switch1" ) | { cdrDeco => escapedFormatDfs( "hdfs://nameNode:9000/user/cdr/input/switch1", "CDRRecs", raw() ) }
- Start the Node (bin/flume node)
- Now the node will poll the source directory and perform the transformation as mentioned in cdrDeco and sink it to the HDFS directory



