Sample Applications for Twister
Kmeans Clustering
Matrix Multiplication
Graph Search
Word Count
High Energy Physics

Kmeans Clustering
Kmeans clustering is a well-known clustering algorithm aiming to cluster a set of data points to a predefined number of clusters. A MapReduce style parallel algorithm for Kmeans Clustering is shown in the first figure. In that each map function gets a portion of the data, and it needs to access this data split in each iteration. These data items do not change over the iterations, and it is loaded once for the entire set of iterations. The variable data is the current cluster centers calculated during the previous iteration and hence used as the input value for the map function.

All the map functions get this same input data (current cluster centers) at each iteration and computes a partial cluster centers by going through its data set. A reduce function computes the average of the partial cluster centers and produce the cluster centers for the next step. Main program, once it gets these new cluster centers, calculates the difference between the new cluster centers and the previous cluster centers and determine if it needs to execute another cycle of MapReduce computation.

MapReduce for Kmeans Clustering Algorithm

In the following figure, we compare the performance of different parallel programming runtimes for this iterative computation.

Performance of different Parallel Programming Runtimes for Kmeans Clustering Computation

More details of this analysis can be found in this paper. [You can also download the MPI version of the Kmeans clustering program from this link].
To run word count sample, please refer to the readme file at the following location.


Matrix Multiplication
Matrix multiplication is another well-studied area of research. Among the many parallel  algorithms for matrix multiplication, Fox's Algorithm and Cannon's Algorithm perform the best. However applying these algorithms in MapReduce style is difficult due to the restricted communication patterns available in MapReduce domain. Hence I derived a matrix multiplication algorithm for MapReduce  based on the column/row decomposition strategy.

To clarify our algorithm, let's consider an example where two input matrices, A and B, produce matrix C, as the result of the multiplication process. We split the matrix B into a set of column blocks and the matrix A into a set of row blocks. In each iteration, all the map tasks process two inputs: (i) a column block of matrix B, and (ii) a row block of matrix A; collectively, they produce a row block of the resultant matrix C. The column block associated with a particular map task is fixed throughout the computation, while the row blocks are changed in each iteration. However, in Hadoop's programming model (a typical MapReduce model), there is no way to specify this behavior.  Hence, it loads both the column block and the row block in each iteration of the computation. Twister supports the notion of long running map/reduce tasks where these tasks are allowed to retain static data in the memory across invocations, yielding better performance for "Iterative MapReduce" computations. The communication pattern of this application is shown in the following figure.

MapReduce Algorithm for Full Matrix Multiplication

In the following figure we compare the parallel overhead of different parallel runtimes for matrix multiplication application.

Parallel overhead of Twister, Hadoop, and MPI in Matrix Multiplication

To run word count sample, please refer to the readme file at the following location.



The search engine such as Google,Yahoo need to rank the query results from the Internet. The ranking factors of the web pages can be contend-based or link-based. PageRank is the well know link analysis algorithm. It calculates numerical value to each element of a hyperlinked set of web pages, which reflects the probability that the random surfer will access that page. The process of PageRank can be understood as a Markov chain which needs recursively calculation to converge. So it is within the class of applications where multiple iterations are necessary for the overall computation.

In PageRank, the goal is to calculate the access probability for each web page. An iteration of the algorithm calculates the new access probability for each web page based on values calculated in the previous computation. The iteration will not stop until the difference value is less than a predefined threshold, where the difference value is the different between the access probabilities of web pages in Nth iteration and the those in (N+1)th iteration.

MapReduce Implementation of Pagerank Algorithm

In Twister implementation of PageRank, we constructed web graphs with vertices where in-link degree of all pages comply with the power law distribution. These input data are partitioned into few parts and stored in the format of adjacency list. Each map function runs on one of the partitioned data, which are constant over the iterations. The variable data are the PageRank values calculated during previous iteration which in turns used as the input value for the next iteration. In each iteration, the MAP task updates the old PageRank values to new one by analyzing the local partial adjacency list file. The output of MAP task is partial of PageRank values. The reduce task receives all the partial output and produces the new PageRank values.

To run pagerank sample, please refer to the readme file at the following location.


Graph Search

This algorithm tries to use Twister Framework to process breadth-first graph search problem parallel. This algorithm is based on Cailin's Hadoop version of breadth-first graph search[1]. The basic idea of this algorithm is to exploring the nodes of the same level parallel, and then to go to the nodes in next levels iteratively. Every step will be sequentially described in the following chapters.

Note: This algorithm may not be optimized.

Graph Data Preparation
The original graph data is held in one file. In this file, each line represents a node information. It begins with a node ID and followed by edges ID. To do breath-first graph search, the graph data is firstly loaded into main program.

Main Function
Main function drives a set of MapReduce tasks iteratively to do graph search. Main function initially set the root node and put nodes into different groups as initial inputs to map tasks. Every map task will use this input to explore the nodes in next level, and then output all nodes it gets and generates. Reduce tasks and Combiner will collect these nodes, select the shortest path and send them back to main function. Main function will regroup these nodes again and send them back to map tasks again to explore the nodes iteratively.

Terminate Condition
When all the nodes are marked as "explored" and the shortest path all has already been agreed. The program ends.

Map Task
For every map task, it will select the nodes which need exploring and put them into the queue. New node object of next level will be generated after exploring. Here is the difference between Cailin's[1] version and the version here. Because there are several nodes contained in the same map task, it tries to combine some new generated node object with the node object it already has in input. This will try to do a combining in MAP task and reduce the size of output.
In collecting period, those path values are output to reduce tasks.

Rreduce Task and Combiner
Those path values output by map tasks will be collected by reduce tasks. They are grouped by their node id. If a node id has several path values, only the shortest path value of all is selected as it path value and the final exploring state is also agreed and marked. The Combiner will collect all nodes back to main function.

[1]Cailin. 6 18, 2009.

To graph search sample, please refer to the readme file at the following location.


Word Count
This is a simple MapReduce computation that is often use to explain the MapReduce programming model. This application does not utilize the iterative MapReduce capabilities of Twister. It is mainly used here as a starting material.

The goal of the word count application is to count the number of occurrences of the words in a collection of documents. Unlike typical MapReduce runtimes, Twister allows users to handle files at Map tasks and hence the word count map task can read a data partition (typically a file) and produce the count of words that it identifies. At the reduce tasks, these counts are summarized to produce a total word counts.

In typical word count applications implemented using other MapReduce runtimes, the map task outputs (word,1) pairs for all the words it encounter. This approach is not optimized for performance rather simple to program. With small amount of more complexity we can simply convert this map task to produce a list of (word,count) pairs corresponding to the local data partion. This is the approach used in Twister word count application.

MapReduce for Word Count

To run word count sample, please refer to the readme file at the following location.



High Energy Physics(HEP) Data Analysis


MapReduce for HEP Data Analysis

Performance of Twister (CGL-MapReduce) and Hadoop in HEP Data Analysis

As part of an effort funded by the DoE we are working with the High Energy Physics group at Caltech with their particle physics data analysis tools. The data analysis framework used by these tools is ROOT, and the analysis functions are written using an interpreted language of ROOT named CINT.

The goal of the analysis is to execute a set of analysis functions on a collection of data files produced by high-energy physics experiments. After processing each data file, the analysis produces a histogram of identified features. These histograms are then combined to produce the final result of the overall analysis. This data analysis task is both data and compute intensive and fits very well for MapReduce computation model. First figure shows the program flow of this analysis once it is converted to a MapReduce implementation and the second figure compares the performance of Twister and Hadoop for this data analysis. More details of this analysis can be found in this paper.