Getting started with Hive Analytics in WSO2 Business Activity Monitor

  • By Maninda Edirisooriya
  • 26 Jul, 2013

Table of contents

  • Introduction
  • Why Hive scripts in BAM?
  • Hive basics

    • Creating a Hive table for an RDBMS table

      • Method 1
      • Method 2
    • Creating a Hive table for a Cassandra column family
    • Updating a Hive table

Introduction

This article describes how to write Hive queries to be used in the WSO2 BAM 2.2.0 platform. We also recommended reviewing available on-line documentation on the Apache Hive project as supplementary sources of reference. Note that this document focuses mainly on the Hive syntax used in WSO2 BAM and should not be considered as comprehensive Hive learning material.

Why Hive scripts in BAM?

Data analytics is a major part of WSO2 BAM. It is mainly implemented using the Apache Hadoop-based big data analytics framework that uses the highly scalable MapReduce technology. In order to use this platform, the data analytics problem should be implemented using MapReduce functions. However, manually writing MapReduce functions is a time-consuming and error-prone task. Apache Hive has introduced a higher level of abstraction written on top of the Hadoop MapReduce engine to ease the task. Hive is basically an SQL-like language, which is relatively easy to learn than programming with MapReduce jobs. That's the rationale for using Hive inside WSO2 BAM.

Hive basics

Similar to SQL tables defined in RDBMS, Hive defines a virtual table structure wrapping the existing

  1. RDBMS tables – e.g. H2 database table, MySQL table
  2. NoSQL column families – e.g. Cassandra column families

as a Hive table. For instance, when a Hive table is created wrapping an MySQL table, the Hive table has a set of fields corresponding to each data field 'mapped' in the MySQL table. This means that all data fields we require to be defined in the Hive table should be individually mapped with the MySQL table. Unmapped fields will not be available and visible in the Hive table. Hive tables are maintained by the Hive engine by keeping a set of meta-data related to each real table/column family in the secondary table. Therefore, updating or deleting that table/column family with a third-party software will not update the meta-data maintained by the Hive table. Therefore, it is advisable to do these operations to an SQL table (but creating a column family is not possible with this Hive command) through the operations of the Hive table. As a result, there is no need to create an SQL database using a separate tool/software as creating a Hive table with the Hive script given below will automatically create the required SQL table.

Creating a Hive table for an RDBMS table

Method 1

Here is an example syntax for creating a Hive table corresponding to a physical H2 database table.

CREATE EXTERNAL TABLE IF NOT EXISTS ActivitySummaryTable(
	messageRowID STRING, sentTimestamp BIGINT, bamActivityID STRING, soapHeader STRING, soapBody STRING, host STRING)
	STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
	TBLPROPERTIES (
	'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE',
	'hive.jdbc.update.on.duplicate' = 'true' ,
	'hive.jdbc.primary.key.fields' = 'messageRowID' ,
	'hive.jdbc.table.create.query' =
	'CREATE TABLE ActivitySummary (messageRowID VARCHAR(100) NOT NULL PRIMARY KEY,
	 sentTimestamp BIGINT, bamActivityID VARCHAR(40), soapHeader TEXT, soapBody TEXT, host VARCHAR(25))' );

This is a script used in BAM 2.2.0 Activity Monitoring Toolox. Here the Hive table, ActivitySummaryTable is created by mapping the real physical H2 table, ActivitySummary. Here, use CREATE EXTERNAL TABLE IF NOT EXISTS to ensure ActivitySummaryTable is created only once to avoid creating it multiple time and thus reducing performance. messageRowID, sentTimestamp, bamActivityID, soapHeader, soapBody and host are the set of parameters used in the Hive table, which are mapped to the real data fields messageRowID, sentTimestamp, bamActivityID, soapHeader, soapBody and host located in H2 database. 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' is the JDBC driver class location used in the H2 database connection. There is no need to explicitly include this artifact into the BAM as WSO2 BAM comes with this artifact out-of-the-box. When the Hive table is created with such an RDBMS table, it needs to specify other database-related properties with TBLPROPERTIES. They are as follows:

  1. 'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE' – Data source name is assigned, which provides other connection-related information such as username, password, port, etc. This data source should be defined in the master-datasources.xml file located in the /repository/conf/datasources directory. WSO2BAM_DATASOURCE is a default H2 data source that comes out-of-the-box with WSO2 BAM. Instead of defining this in that master-datasources.xml configuration file, it is also possible to explicitly define the database connection parameters (Method 2 is about that type of declaration).
  2. 'hive.jdbc.update.on.duplicate' = 'true' - This is used to overwrite existing data rows in the table when the table is updated with an entry with the same primary key.
  3. 'hive.jdbc.primary.key.fields' = 'messageRowID'- This is the place where primary key fields are set to the Hive table. In this example, only messageRowID is set as the primary key. However, if required, more than one primary key can be set as primary keys using comma separated value, e.g. 'hive.jdbc.primary.key.fields' = 'primarykey1, primarykey2, primarykey3'.
  4. 'hive.jdbc.table.create.query' = 'CREATE TABLE... - This is the only location we should mention about real SQL commands to be executed in the real RDBMS system. CREATE TABLE ActivitySummary (messageRowID VARCHAR(100) NOT NULL PRIMARY KEY, sentTimestamp BIGINT, bamActivityID VARCHAR(40), soapHeader TEXT, soapBody TEXT, host VARCHAR(25) is the H2 database creation SQL command. Note how the data fields of real H2 database are related to the fields of the Hive table. The table below shows how data types are mapped from Hive table fields to H2 table fields.
Hive H2
STRING VARCHAR(100) – Number characters should be sufficiently large
STRING TEXT – This is specific to H2. Other database types can have different types which is used for declaring strings with unlimited length.
INT INT
SMALLINT SMALLINT
BIGINT BIGINT
DOUBLE DOUBLE

Method 2

Instead of defining the data source in the master-datasources.xml as mentioned in Method 1, the RDBMS connection parameters can be defined in line in the Hive scrip as properties. The following is a H2 table-based Hive table creation script in the real BAM 2.2.0 KPI Phone Retail Store sample.

CREATE EXTERNAL TABLE IF NOT EXISTS UserTable(
	name STRING, totalOrders INT, totalQuantity INT) 
	STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' 
	TBLPROPERTIES ( 
	'mapred.jdbc.driver.class' = 'org.h2.Driver' , 
	'mapred.jdbc.url' = 'jdbc:h2:repository/database/samples/WSO2CARBON_DB;AUTO_SERVER=TRUE' , 
	'mapred.jdbc.username' = 'wso2carbon' , 
	'mapred.jdbc.password' = 'wso2carbon' , 
	'hive.jdbc.update.on.duplicate' = 'true' , 
	'hive.jdbc.primary.key.fields' = 'name' , 
	'hive.jdbc.table.create.query' = 
	'CREATE TABLE UserSummary (name VARCHAR(100) NOT NULL PRIMARY KEY,
	 totalOrders  INT, totalQuantity INT)' );

The following are the new properties used instead of wso2.carbon.datasource.name property in this method.

  1. 'mapred.jdbc.driver.class' = 'org.h2.Driver' - This is the JDBC class name required for the connection to the database. The JDBC driver jar file should be added to [BAM-Server-Location]/repository/components/lib directory. H2 JDBC driver is packaged with BAM in default.
  2. 'mapred.jdbc.url' = 'jdbc:h2:repository/database/samples/WSO2CARBON_DB;AUTO_SERVER=TRUE' - This is the JDBC URL to the database.
  3. 'mapred.jdbc.username' = 'wso2carbon' - This is the JDBC Username.
  4. 'mapred.jdbc.password' = 'wso2carbon' - This is the JDBC Password.

Creating a Hive table for the Cassandra column family

The following is the Hive script for creating a Hive table wrapping a Cassandra column family.

CREATE EXTERNAL TABLE IF NOT EXISTS ActivityDataTable
	(messageID STRING, sentTimestamp BIGINT, activityID STRING, version STRING, soapHeader STRING, soapBody STRING, host STRING)
	STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
	WITH SERDEPROPERTIES (
 	"cassandra.host" = "127.0.0.1" ,
	"cassandra.port" = "9160" ,
	"cassandra.ks.name" = "EVENT_KS" ,
	"cassandra.ks.username" = "admin" ,
	"cassandra.ks.password" = "admin" ,
	"cassandra.cf.name" = "org_wso2_bam_activity_monitoring" ,
	"cassandra.columns.mapping" =
	":key, payload_timestamp, correlation_bam_activity_id, Version, payload_SOAPHeader, payload_SOAPBody, meta_host" );

In NoSQL databases like Cassandra, column families are logically similar to tables in the SQL paradigm. For more details about Cassandra, refer to the documentation available for Apache Cassandra. In the NoSQL paradigm, there are only a set of key-value pairs available for each row instead of the predefined data fields in the RDBMS paradigm. Therefore, there can be an arbitrary number of keys in rows of a one-column family. However, when they are mapped to a Hive table, a predefined set of keys should be selected and be mapped to the fields of the Hive table. It will not be difficult to understand the given Hive command other than some of the changes related to the H2 table creation use case. We now go through these as follows.

  1. See how the new handler, org.apache.hadoop.hive.cassandra.CassandraStorageHandler is used instead of the JDBC handler class.
  2. See how WITH SERDEPROPERTIES is used instead of TBLPROPERTIES command.
  3. See how the Cassandra keyspace (keyspace is the logically relevant concept in NoSQL world to the database in RDBMS paradigm) connection parameters, host, port, user name and password are declared explicitly in properties, cassandra.host, cassandra.port, cassandra.ks.username and cassandra.ks.password. In addition, see how the Cassandra column family name, org_wso2_bam_activity_monitoring is set as the cassandra.cf.name property and keyspace name EVENT_KS is assigned to the cassandra.ks.name property .
  4. Here cassandra.columns.mapping property is used to map the column family keys to the Hive table fields instead of hive.jdbc.table.create.query property used in earlier case. So messageID, sentTimestamp, activityID, version, soapHeader, soapBody and host are the Hive table fields mapped to the column family keys (keys of key-value paris) :key, payload_timestamp, correlation_bam_activity_id, Version, payload_SOAPHeader, payload_SOAPBody and meta_host in Cassandra keyspace. The reason is that this column family has been created already. This Hive script only creates the mapped Hive table into the existing column family. Note that the :key is the unique row key available for each row in the Cassandra column family. This field must be mapped with a Hive table field in every Hive script.

Updating a Hive table

As mentioned earlier, a user can create Hive tables on both RDBMS-based tables or by using Cassandra column families. Once a Hive table is created, Hive internally maintains a meta-data table about the real table. The purpose of making a Hive table is to execute Hive queries on these Hive tables. This is very similar to executing SQL queries on a REBMS table. However, instead of using an UPDATE keyword, Hive uses INSERT OVERWRITE keywords. Let's go back to the Activity Monitoring use case. In that use case, once the Hive tables for Cassandra and RDBMS are created, the next step is to overwrite the RDBMS with the Cassandra. This is the simplest use case explained as a Hive query. The following script is the RDBMS table updating the script.

insert overwrite table ActivitySummaryTable
	select messageID, sentTimestamp, activityID, soapHeader, soapBody, host
	from ActivityDataTable
	where version= "1.0.0";

Note that Hive queries are only compatible with Hive tables. Therefore, before writing a Hive script on either an RDBMS table or Cassandra column family, Hive tables should be created on them. Now, let's consider a more comprehensive example with 3 Hive commands in the BAM 2.2.0 Service Statistics script.

CREATE EXTERNAL TABLE IF NOT EXISTS AppServerStats (key STRING, service_name STRING,operation_name STRING,
request_count INT,response_count INT,fault_count INT, response_time BIGINT,remote_address STRING,
payload_timestamp BIGINT,host STRING) STORED BY 
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
"cassandra.port" = "9160","cassandra.ks.name" = "EVENT_KS",
"cassandra.ks.username" = "admin","cassandra.ks.password" = "admin",
"cassandra.cf.name" = "bam_service_data_publisher",
"cassandra.columns.mapping" = ":key,payload_service_name,payload_operation_name,payload_request_count,payload_response_count,payload_fault_count, payload_response_time,meta_remote_address, payload_timestamp,meta_host" );                                    

CREATE EXTERNAL TABLE IF NOT EXISTS AppServerStatsPerMinute(host STRING, service_name STRING, operation_name STRING, total_request_count INT,total_response_count INT,
total_fault_count INT,avg_response_time DOUBLE,min_response_time BIGINT,max_response_time BIGINT, year SMALLINT,month SMALLINT,day SMALLINT,hour SMALLINT,minute SMALLINT, time STRING) 
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES ( 
'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE',
'hive.jdbc.update.on.duplicate' = 'true',
'hive.jdbc.primary.key.fields' = 'host,service_name,operation_name,year,month,day,hour,minute',
'hive.jdbc.table.create.query' = 'CREATE TABLE AS_STATS_SUMMARY_PER_MINUTE ( host VARCHAR(100) NOT NULL, service_name VARCHAR(150),operation_name VARCHAR(150), total_request_count INT,total_response_count INT,
total_fault_count INT,avg_response_time DOUBLE,min_response_time BIGINT,max_response_time BIGINT, year SMALLINT, month SMALLINT, day SMALLINT, hour SMALLINT, minute SMALLINT, time VARCHAR(30))' );

insert overwrite table AppServerStatsPerMinute select host,service_name,operation_name, sum(request_count) as total_request_count, sum(response_count) as total_response_count,sum(fault_count) as total_fault_count,avg(response_time) as avg_response_time,min(response_time) as min_response_time, max(response_time) as max_response_time, year(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as year, month(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as month,day(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as day,hour(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as hour, minute(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as minute,concat(substring(from_unixtime(cast(payload_timestamp/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,16),':00') as time from AppServerStats group by year(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )), month(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),day(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),hour(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),minute(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),substring(from_unixtime(cast(payload_timestamp/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,16), host,service_name,operation_name; 

Some of the aggregation functions are used in the INSERT OVERWRITE TABLE command. Here are some useful aggregation functions used in Hive language.

  1. sum - Get the sum of all records in the given filed
  2. avg - Get the average of all records in the given filed
  3. min - Get the minimum of all records in the given filed
  4. max - Get the maximum of all records in the given filed

For more Hive functions refer the Hive Language Manual.

Author

Maninda Edirisooriya, Software Engineer, WSO2