2014/03/05
5 Mar, 2014

Analyzing network bandwidth usage with WSO2 BAM

  • Chamila Wijayarathna
  • SoftwareEngineer - WSO2

Table of contents

  1. Introduction
  2. Network bandwidth usage analysis with WSO2 BAM
  3. WSO2 Infra Log Analyzer
  4. Summary
  5. References

Introduction

Network bandwidth monitoring is one of the most critical activities of an organization’s network administrator. It's essential to monitor the network usage of an organization and many organizations face certain problems, such as their clients’ and employees’ complaints about slow response times of a particular application, product, remote server access, and version control system access, which eventually results in degradation of effective working time of employees and the organization’s productivity. The solution for this problem is often more difficult than generally assumed. The real problem can be either in the main router or the switch, unusual high usage from users, etc. For a quick and accurate analysis of the above problems, the network administrator needs a network traffic monitoring solution that analyzes bandwidth, users’ behaviour, and analyzes traffic in the network and provides detailed results.

In a previous article, we discussed how we used WSO2 CEP to send real-time notifications to the bandwidth exceeding users. In this article, we will discuss how WSO2 BAM can be used to analyze network bandwidth usage, and visualize the summary of usage for each user. The network administrator is concerned about the following three main areas where they can plan and control the usage of the network and increase productivity of the organization.

  • Which users consume more bandwidth
  • Behaviour of users who consume more bandwidth
  • When users consume more bandwidth in a day/month/year

Network bandwidth usage analysis with WSO2 BAM

The above diagram shows the main four components of WSO2 BAM; Data publisher, Data Receiver, Data Analyzer and Dashboard as shown in the below diagram. Data publisher is the client who sends events to WSO2 BAM. These events are received by Data receiver in WSO2 BAM and stored in Cassandra, a big data storage. Data analyzer is based on Apache hive and Apache hadoop, and according to the hive scripts written on data analyzer, it will fetch the data from the Cassandra data store and RDBMS data store, and write the results back to the RDBMS. The final summarized results in the RDBMS can be viewed on the dashboard. For more information you can refer to the official BAM documentation.

WSO2 Infra Log Analyzer

In this article, we are going to focus on building an application to monitor the network bandwidth usage and discuss the main focus areas on designing such applications using WSO2 BAM. We have developed such an application ‘WSO2 Infra Log Analyzer’, which is a Jaggery-based application that shows the summarized network bandwidth usage by WSO2 BAM. The above diagram shows the main components of the system, and we’ll look into the details of each components and its functionalities in the section below.

  1. Receiving events to BAM from routers
  2. As shown in the above diagram and explained above, we first need to send the traffic information to CEP/BAM by using data publishers. In the current use case of WSO2 Inc. network monitoring, Cisco routers have been used; therefore, Cisco routers have been configured to send details of bandwidth usage as netflow records [2][3]. In netflow, the router send details of the user’s IP address, destination IP address, the time the connection starts, how many bytes transferred, how many packets transferred, etc. as UDP packets to a specified port. Netflow exists in different versions and the most popular versions are V5, V6 and V9. In our case, the router sent V5 flow to the port 9996 of the server in which we hosted our system.

    The Data publisher is created using jflow library, which decodes netflow records to retrieve important data like user IP, destination IP, the amount of data transferred and the time the transfer occurred, and they are sent to CEP as wso2event stream. Here we send the events to WSO2 CEP rather WSO2 BAM, as we need to do some real-time events filtering in CEP and send the filtered events to BAM. Netflow tends to send events for internal network communication as well, hence we need to filter such events and only send the events that are initiated from the user’s IP address. After this filtering, WSO2 CEP will send the events to BAM.

    The following code snippet shows how events can be sent from the Cisco router to WSO2 CEP based on the above explanation.

      
    public static final String STREAM_NAME = "to_bam";
    public static final String VERSION = "1.0.0";
    static DataPublisher dataPublisher = new DataPublisher("tcp://" + host + ":" + port, username, passwd);
    try {
                streamId1 = dataPublisher.findStream(STREAM_NAME, VERSION);
                System.out.println("Stream already defined");
    
            } catch (NoStreamDefinitionExistException e) {
            	streamId1 = dataPublisher.defineStream("{" +
                        "  'name':'" + STREAM_NAME + "'," +
                        "  'version':'" + VERSION + "'," +
                        "  'nickName': 'Statistics'," +
                        "  'description': 'Service statistics'," +
                        "  'metaData':[" +
                        
                        "          {'name':'referer','type':'STRING'}" +
                        "  ]," +
                        "  'payloadData':[" +
                        "          {'name':'SrcIp','type':'STRING'}," +
                        "          {'name':'DestIp','type':'STRING'}," +
                        "          {'name':'size','type':'LONG'}," +
                        "          {'name':'timestamp','type':'STRING'}" +
                        "  ]" +
                        "}");
            	
            }
    Object[] meta = new Object[]{ "MetaData" };
    Object[] payload = new Object[]{SrcIp,  DestIp, size, timestamp };
    Object[] correlation = null;
    Event statisticsEvent = new Event(streamId1, System.currentTimeMillis(),
                                                      meta, correlation, payload);
    dataPublisher.publish(statisticsEvent);
    

    This event is received by BAM Data receiver and stored immediately in the Cassandra datastore.

    When BAM receives events, it creates a Cassandra column family with the stream name and stores details of the records.

  3. Analyze the source events and store the results in RDBMS
  4. Since netflow sends a large amount of data, this column space will contain millions of entries. Cassandra is a schema less storage, and it’s designed to store big data.

    Therefore, using this for showing statistics when requested by a user or admin will not be very efficient. As a solution for this, we can use BAM analytic scripts to process these data, summarize them, and save them to MySQL summary tables. There will be five summary tables; hourlyusage for keeping hour-by-hour summary, dailyusage for keeping day-by-day summary, weeklyusage for keeping week-by-week summary, monthlyusage for keeping month-by-month summary, and yearlyusage for keeping year-by-year summary.

    At the end of each hour by running the following Hive analytic script, it summarizes netflow details of the last hour in Cassandra to the MySQL hourlyusage table.

      
    CREATE EXTERNAL TABLE IF NOT EXISTS hourlyLogEvent
    	(id STRING, userIp STRING, siteIp STRING,  size BIGINT, timesta STRING) 
    	STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
    	WITH SERDEPROPERTIES ( 
    	"wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE" ,
    	"cassandra.cf.name" = "to_bam" , 
    	"cassandra.columns.mapping" = ":key, payload_UserIp, payload_SiteIp, payload_size, payload_timestamp" );	
    CREATE EXTERNAL TABLE IF NOT EXISTS hourlyusage(
    	userIp STRING, siteIp STRING,  amount INT, day STRING, hour STRING) 
    	STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' 
    	TBLPROPERTIES ( 
        'wso2.carbon.datasource.name'='Log_Analyzer',
    	'hive.jdbc.update.on.duplicate' = 'true' , 
    	'hive.jdbc.primary.key.fields' = 'userIp,siteIp,day,hour' , 
    	'hive.jdbc.table.create.query' = 
    	'CREATE TABLE hourlyusage (userIp CHAR(15), 
    	siteIp CHAR(15), amount INT,day CHAR(10), hour CHAR(2), PRIMARY KEY(userIp,siteIp,day,hour))' ); 		
    insert overwrite table hourlyusage 
    	select userIp, siteIp, sum(size), to_date(timesta), hour(timesta)  from hourlyLogEvent	
    	where to_date(timesta) == if(hour(from_unixtime(unix_timestamp()))==0,date_sub(to_date(from_unixtime(unix_timestamp())),1),to_date(from_unixtime(unix_timestamp())))
    	and hour(timesta)== 	if( hour(from_unixtime(unix_timestamp()))==0, 23 ,hour(from_unixtime(unix_timestamp()))-1)
    	group by userIp, SiteIp, to_date(timesta),hour(timesta);
    

    In the above hive script, the first hive query creates the external table on the source Cassandra column family ‘to_bam’, and this ‘hourlyLogEvent‘ table will be used to fetch data from Cassandra for further analysis. In this Cassandra database, configuration details such as URL, username. password, etc. are provided in the data source called ‘WSO2BAM_CASSANDRA_DATASOURCE’. Similarly, the summarization result will be written into the hive table ‘hourlyusage’. This hive table has been created on 'Log_Analyzer' data source, which is configured to the MySQL database.

    Since the entire use case is driven as super-tenant, we can specify the datasource configuration in $BAM_HOME/repository/conf/datasources/master-datasources.xml file "WSO2BAM_CASSANDRA_DATASOURCE" and “Log_Analyzer” data sources will be defined for pointing to the Cassandra column family and MySQL database, respectively.

    The above hive query considers past hours data into the summarization process. This is enforced within the ‘where’ clause, which put the limit in the hive query. The simplified condition pseudo code for the condition is shown below.

      
    if(hour(from_unixtime(unix_timestamp()))==0){
    to_date(timesta) = date_sub(to_date(from_unixtime(unix_timestamp())),1)
    }else{
    to_date(from_unixtime(unix_timestamp())))
    }
    	
    if( hour(from_unixtime(unix_timestamp()))==0){
     hour(timesta)=23
    }else{
    hour(timesta)=hour(from_unixtime(unix_timestamp()))-1)
    }
    

    The ‘where’ clause operates on both values of to_date(timesta) AND hour(timesta) and filters only the events that meet the required condition.

    In this use case, we are interested in the summarized results of hourly/daily/weekly/monthly usage, and we do not need to consider it again during the summarization as we have already stored the results. Hence, at the beginning of each hour, the hive script will start to run and summarize the past hours’ data and store immediately in the RDBMS. Due to this, even if a part of source data/old already summarized in Cassandra has been moved from the original column family, it will not have an impact on the accuracy of the summarization results as it will not be considered during the next iteration of the hive script. You can refer to [6] to see how to get started with the hive analytics script.

    As mentioned above, this hive script is configured to run each hour as shown in the image below.

    Then, at the end of each day, by following the hive script using that day’s hourlyusage summary created earlier, the dailyusage summary will be created.

      
    CREATE EXTERNAL TABLE IF NOT EXISTS hourlyusage(
    	userIp STRING, siteIp STRING,  amount INT, day STRING, hour STRING) 
    	STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' 
    	TBLPROPERTIES ( 
        'wso2.carbon.datasource.name'='Log_Analyzer',
    	'hive.jdbc.update.on.duplicate' = 'true' , 
    	'hive.jdbc.primary.key.fields' = 'userIp,siteIp,day,hour' , 
    	'hive.jdbc.table.create.query' = 
    	'CREATE TABLE hourlyusage (userIp CHAR(15), 
    	siteIp CHAR(15), amount INT,day CHAR(10), hour CHAR(2), PRIMARY KEY(userIp,siteIp,day,hour))' ); 
    CREATE EXTERNAL TABLE IF NOT EXISTS dailyusage(
    	userIp STRING, siteIp STRING,  amount INT, day STRING) 
    	STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' 
    	TBLPROPERTIES ( 
        'wso2.carbon.datasource.name'='Log_Analyzer',
    	'hive.jdbc.update.on.duplicate' = 'true' , 
    	'hive.jdbc.primary.key.fields' = 'userIp,siteIp,day' , 
    	'hive.jdbc.table.create.query' = 
    	'CREATE TABLE dailyusage (userIp CHAR(15) NOT NULL , 
    	siteIp CHAR(15) NOT NULL, amount  INT, day CHAR(10), PRIMARY KEY(userIp,siteIp,day))' ); 	
    insert overwrite table dailyusage 
    	select userIp, SiteIp, sum(amount) as amount, day  from hourlyusage
    	where day==date_sub(to_date(from_unixtime(unix_timestamp())),1)
    	group by userIp, SiteIp, day;
    

    In the same manner weekly summer will be created using daily summaries of the week at end of week, monthly summer will be created using daily summaries of the month at the end of the month and yearly summary will be created using monthly summaries of the year at the end of the year.

  5. Archive already summarized data
  6. One problem that can occur here is that, when the Cassandra database grow with its size, the time taken to execute the hourly summary script will increase. To avoid this, we archived older data [4]. In the hourly summary script, we only use data of the last hour, so after running that script there is no use of keeping that data. The data archive script can be created and scheduled as follows. This will move data that belongs to the previous day into a separate archive database. Since the hourly summary script only requires data that belongs to the latest hour, this will not delete data that’s required for execution of that script.

  7. Visualizing the results in the dashboard
  8. To show data that was written to the MySQL database by BAM, we used WSO2 Application Server. For this, we first need to create a data source in the application server by following this path.

    Home > Configure > Data Sources > New Data Source.

    You need to make sure that you have added mysql-connector-java-5.1.-bin.jar to $wso2ASHome/repository/components/lib. This will basically create the OSGI bundle for the MySQL connector jar and, at run time, the WSO2 components can load the required class from the connector when required in the OSGI environment.

    Also, add the below parameter to axis2.xml file located in $wso2ASHome/repository/conf/axis2 directory.

    true.

    After creating the data source, we generated a data service for that data source. This data service will be referred to in the Dashboard and WSO2 Infra Log Analayzer components to visualize the data.

    To create the dataservice, you need to go to Home > Manage > Services > Add > Data Service > Generate in WSO2 Application Server and generate the data source as shown below.

    Create one service for all selected tables. Here, we have created data services for each and every function in the project. For an example, to display the top 10 users in the Homepage, we create the data service as follows.

      
    
          SELECT full_name, sum(amount) as total FROM hourlyusage,user WHERE day=DATE(DATE_SUB(NOW(),INTERVAL 1 HOUR)) and hour=HOUR(DATE_SUB(NOW(),INTERVAL 1 HOUR)) and hourlyusage.userIp=user.ip_address GROUP BY full_name ORDER BY total DESC LIMIT 10
          
             
             
          
       
       
          
       
    

    As mentioned above, the created data service will be used to call from the dashboard jaggery application, the below jaggery code snippet shows how invocation to the data service is being performed and the data is populated.

      
    ……………..
    

    Top 10 Users

    "); print("") print(""); print(""); print(""); }; %>
    # User Name Bandwidth(KB)
    "+ (i+1)+""+re3[i].full_name+""+re3[i].total+"
    ………….

    Thereafter, the data is populated as shown in the below image.

    Let’s take another use case where we have to display the current top sites when the user selects this option from a drop-down menu.

    The data service looks the same except for the SQL query as explained above.

      
    
          SELECT full_name, sum(amount) as total FROM hourlyusage,user WHERE day=DATE(DATE_SUB(NOW(),INTERVAL 1 HOUR)) and hour=HOUR(DATE_SUB(NOW(),INTERVAL 1 HOUR)) and hourlyusage.userIp=user.ip_address GROUP BY full_name ORDER BY total DESC LIMIT 10
          
             
             
          
       
       
          
       
    

    Similar to the above shown Jaggery code snippet, the data services will be called and the data will be plotted in the Jaggery application. The data will be displayed as follow.

    One use case in the WSO2 Infra Log Analyzer was to display all the data in a dashboard in charts, pie charts etc. We designed the dashboard as follows.

    To load the data to the dashboard, we called the following function.

      
    
    

    Summary

    This article demonstrates how we have successfully used WSO2 BAM to monitor the network bandwidth usage within the internal WSO2 network. This proves the capability of sending your own set of custom events for your requirement, writing your own analytics, and visualizing the results with your preferred dashboard. Here, we used SOA in integrating the components, and we have used WSO2 Jaggery to build the dashboard applications.

    References

    1. NetFlow
    2. Archive Cassandra data
    3. WSO2 BAM documentation
    4. Writing your own data publishers
    5. Writing hive analytics script

    Other WSO2 contributors to this article

    1. Chamika Kasun
    2. Tharindu Ranasinghe
    3. Pubudu Gunathilaka
    4. Asiri Liyanaarachchi
 

About Author

  • Chamila Wijayarathna
  • SoftwareEngineer
  • WSO2