Configuring Twister
Single Machine
For a Cluster
Use ActiveMQ Broker
Compile Twister
Auto Configuration
Managing Data Using Twister
Programming Twister
Typical MapReduce Application
Iterative MapReduce Application
Map-only Application
Enable Fault Tolerance
Use_API to create Partition File
Sample Applications

Configuring Twister Runtime in a single Machine

This section explains how to setup Twister in a single machine. Please follow the steps below. These steps assume that you are on Linux operating system and have already configured Java in all compute nodes.

1. Setting up NaradaBrokering
Download the latest version of NaradaBrokering from here: link.
Unzip the zip file to a directory of your choice in one of the compute nodes. In the case of single machine installation, unzip it to a local directory. Once unzipped, set environment variable (in your bashrc named NBHOME pointing to this directory. You can add the following line to your .bashrc

export NBHOME =/path/to/NaradaBrokering

Once this is done please go to the bin directory and start the broker by running startbr.sh To stop the broker you can use stopbr.sh

2. Setting up Twister
Unzip the Twister zip file to some directory. (say /home/username/Twister). Then set the environment variable named TWISTER_HOME pointing to this directory. You can add the following line to your bashrc.

export TWISTER_HOME =/home/username/TWISTER

Okay, now you need to set few configuration parameters as follows.
1. Open $TWISTER_HOME/bin/twister.properties file and set the correct paths to the following properties according to your system. nodes_file , app_dir, and data_dir

2. Open $TWISTER_HOME/bin/nb.properties file and set broker_host = (to the IP of the machine where you setup NaradaBrokering)

3. Open $TWISTER_HOME/bin/nodes file and add your local IP address to that file. In a single machine setup you should have only one IP address in this file, in the cluster setup this file will have all the IP addresses of the compute nodes.

3. Starting Twister

Start Twister runtime by running

$TWISTER_HOME/bin/start_twister.sh

4. Stopping Twister runtime

Please run:

$TWISTER_HOME/bin/stop_twister.sh

 

Configuring Twister in a Compute Cluster
Before configuring Twister on a compute cluster you need to make sure that that the headnode of the compute cluster can login to the compute nodes via ssh without using a password. This is a common requirement for MPI and Hadoop as well.
To enable certificate based login please follow the instructions given here.

Once you get the above step working, it is very easy to setup Twister on the cluster.

1. Setting up NaradaBrokering

As for the single computer configuration, you need to setup NaradaBrokering in one of the compute nodes. This can be in one of the nodes of the cluster or in some other computer. Next please follow the steps below.

2. Setting up Twister

All compute nodes needs Twister libraries for it to work on the cluster. If your cluster has a shared file system then you can simply copy the Twister to the shared file system and set the paths accordingly. If the cluster is not configured with a shared file system, then you need to copy Twister directory to a common path in all the compute nodes. In either case, at the end you should have TWISTER_HOME environment variable setup with some path like below.
e.g. With shared file system:

export TWISTER_HOME =/globalhome/username/TWISTER

e.g. Without shared file system:

export TWISTER_HOME =/home/username/TWISTER

Next you need to setup $TWISTER_HOME/bin/nb.properties in all the compute nodes. As in the previous step, if you have a shared file system you only need to set this in one place, otherwise update the file in one place and copy it to all the compute nodes.

Okay now you need to set the following properties in one of the compute nodes where you plan to run your applications. If this is a different machine than the compute cluster, please set these properties in that machine. 1. Open $TWISTER_HOME/bin/nodes file and add all the IP addresses of the compute nodes that you need to use.

Next, open $TWISTER_HOME/bin/twister.properties file and set the correct paths to the following properties according to your system. nodes_file , app_dir, and data_dir
Twister provide a tool to create a directory in all compute nodes which can be use as the root directory for the application data.

To create this directory use the following shell script.

$TWISTER_HOME/bin/twister.sh initdir /home/username/datadir

This will create the directory /home/username/datadir in every compute node and you can use this path for the data_dir in twister.properties file.

Note: Except for the single machine scenario, the data_dir should not point a path in a shared file system. This will cause errors when creating a partition file.

3. Starting Twister

Start Twister runtime by running

$TWISTER_HOME/bin/start_twister.sh

4. Stopping Twister runtime

Please run:

$TWISTER_HOME/bin/stop_twister.sh

 

Use ActiveMQ broker
Firstly, you are required to download ActiveMQ 5.4.2 from here, unfold the package and copy $ACTIVEMQ_HOME/activemq-all-X.X.X.jar to $TWISTER_HOME/lib/.

In oder to let Twister co-work with ActiveMQ broker, user need to configure two things in Twister settings. Firstly, make sure pubsub_broker is set to ActiveMQ in twister.properties, Secondly, edit url in amq.properties and change the ip address to node IP where ActiveMQ broker will run.

To start ActiveMQ, please execute:

$ACTIVEMQ_HOME/bin/activemq console

 

Compile Twister Package
Compile Twister package before using it. In order to compile Twister package successfully, user needs to download ActiveMQ client package first and install it to $TWISTER_HOME/lib/ (see Use ActiveMQ broker).

Under $TWISTER_HOME/, execute:

ant

When execution is done, Twister jar package is generated under $TWISTER_HOME/lib/


 

Auto Configuration
Now Twister can automatically setup the environment by executing the script TwisterPowerMakeUp.sh under $TWISTER_HOME/bin/ directory. Through reading the node list provided by user, the program can identify if it works on single node, multiple nodes. After execution, the program provides a proper Twister environment where each node has one Twister Daemon, and one node for ActiveMQ broker.

For correct execution, firstly put ip addresses of nodes to $TWISTER_HOME/bin/nodes, one per line. Then enter $TWISTER_HOME/bin/ and execute:

./TwisterPowerMakeUp.sh

When execution is done, the program will print all the settings. Then user can start ActiveMQ broker and Twister directly.


 

Managing Data Using Twister
Most MapReduce computations require reading large volumes of data. By default Twister supports file based input formats in which the user is allowed to read files within the map/reduce tasks. This approach simplifies our design and avoids the need for various record formats. From our experiance in applying MapReduce programming model to various applications we noticed that in most cases, the input data is available as a collection of files or few large files and the user is the best person to decide the input formats for her computation. One can easily extend Twister to support various input formats as well.

Moving computation to data is a key concept in many MapReduce. Twister also supports this behavior where it reads data from local disks of the computation nodes and the map/reduce computations are executed where the data is located. Unlike Hadoop, twister does not come with the built in file system. Instead it  provides a tool to manage the data across these distributed disks. You can find this utility in the bin directory with the name "twister.sh". Let's look at the functionalities it provides.

Creating a common directory in all compute nodes

This is required before running Twister if you are planning to read data from the files. Use the following command to create a common directory in all the compute nodes. Once this is done, you need to specify this directory as "data_dir"  the in $TWISTER_HOME/bin/twister.properties file.

 ./twister.sh initdir [Directory to create - complete path to the directory]
e.g.
./twister.sh /home/username/data

Create a sub directory inside "data_dir"

It is convenient to create a directory inside "data_dir" to hold application specific data. For example, we can create a directory named kmeans_data under data_dir to hold kmeans data as follows.

./twister.sh mkdir [sub directory to create - relative to data_dir specified in twister.properties]

e.g. ./twister.sh mkdir kmeans_data

This will create a directory /home/username/data/kmeans_data in all the compute nodes.

Delete a sub directory inside "data_dir"

If you need to delete any of the directories inside "data_dir" simply use the following command.

./twister.sh rmdir  rmdir [sub directory to delete - relative to data_dir specified in twister.properties]

e.g. ./twister.sh rmdir kmeans_data

List directories inside the "data_dir"

If you want to look at what is inside data_dir you can use the following commands.

To list the files/directories in the first node.

./twister.sh ls  

To list the files/directories in all the nodes.

./twister.sh ls -a

To list the files/directories in the sub directory "test_dir" in the fist node

./twister.sh ls test_dir

To list the files/directories in the sub directory "test_dir" in all the nodes

./twister.sh ls test_dir -a

Distribute a set of data files to all compute nodes evenly

This is a quite common requirement in many MapReduce computations. Before running a data processing application we need to distribute the data across the compute nodes of the computation cluster. You can use the same twister.sh utility to get this done as well.

./twister.sh put [input data directory (local)][destination directory (remote)]
destination directory - relative to data_dir specified in twister.properties

e.g. ./twister.sh put mydata kmeans  (remember we created a directory named kmeans above)

This will distribute all the data files in the local directory mydata to the "data_dir"/kmeans directories located in all compute nodes. This operation is using multiple threads to speedup the entire file copying task.

Collect set of files from all compute nodes to local directory

To collect set of files distributed across the cluster to a local disk you can use the following command.

./twister.sh get Usage: get [input directory (remote)][file name pattern][destination (local)] input directory - relative to data_dir specified in twister.properties

e.g. ./twister.sh get kmeans km_data_ mydata

Here km_data_ is a pattern in the names of the input files. This allows us to collect only the files with this particular pattern to the local directory.

Copy resources to "apps" directory

This will allow you to copy any resources (jar files and other resources that needs to be in the classpath) to "apps" directory.

./twister.sh cpj [resource to copy to the apps directory]

e.g. ./twister.sh cpj Twister-Kmeans.jar


 

Programming Twister

In this section, we will go through some of the common API functions that Twister offers for efficient MapReduce computations.

A Typical MapReduce Application using Twister

This is  very simple to implement using Twister. In this example, we assume that the user has distributed data using the steps explained in the above section and has created a partition file. The following code snippet from the word-count application shows the main program section in twister which perform the computation.

1.  JobConf jobConf = new JobConf("word-count-map-reduce");

2.  jobConf.setMapperClass(WCMapTask.class);
3.  jobConf.setReducerClass(WCReduceTask.class);
4.  jobConf.setCombinerClass(WCCombiner.class);
5.  jobConf.setNumMapTasks(numMapTasks);
6.  jobConf.setNumReduceTasks(numReduceTasks);

7.  TwisterDriver driver = new TwisterDriver(jobConf);
8.  driver.configureMaps(partitionFile);
9.  TwisterMonitor monitor = driver.runMapReduce();
10. monitor.monitorTillCompletion();
11. Map<String, Integer> wordCounts = ((WCCombiner) driver
.getCurrentCombiner()).getResults();

12. driver.close();
  • Line number 1 -6 deals with the job configurations. Especially pointing to the map/reduce/ and combine classes.
  • In line number 7, we create an instance of TwisterDriver which is the main client side entity that handles the Twister related operations.
  • Line 8 shows how we can configure map tasks with data. Although this is not an iterative application we can use this option to configure maps.
  • In line 9 we simply call the driver to perform the MapReduce computation. This will return us a monitor to check the status of the computation.
  • In line 10 we just wait till the computation is over.
  • Line 11 gives us the final results.
  • Line 12 is used to cleanup the  configured map/reduce tasks and close all the client side connections.

An Iterative MapReduce Application using Twister

Let's see how one can use Twister to implement an iterative MapReduce computation. For the explanation purpose we selected Kmeans clustering application. Here is the wikipedia article about kmeans.

1.  JobConf jobConf = new JobConf("kmeans-map-reduce");
2.  jobConf.setMapperClass(KMeansMapTask.class);
3.  jobConf.setReducerClass(KMeansReduceTask.class);
4.  jobConf.setCombinerClass(KMeansCombiner.class);
5.  jobConf.setNumMapTasks(numMapTasks);
6.  jobConf.setNumReduceTasks(numReducers);

7.  TwisterDriver driver = new TwisterDriver(jobConf);
8.  driver.configureMaps(partitionFile);

9.  double totalError = 0;
10. int loopCount = 0;
11. TwisterMonitor monitor = null;

12. boolean complete = false;
13. while(!complete){
14. monitor = driver.runMapReduceBCast(cData);
15. monitor.monitorTillCompletion();

16. DoubleVectorData newCData = ((KMeansCombiner) driver
                       .getCurrentCombiner()).getResults();
17. totalError = getError(cData, newCData);
18. cData = newCData;
19.   if (totalError < THRESHOLD) {
20.        complete = true;
21.        break;
22.   }
23. }
//Some printing of values.
24. driver.close();
  • As we have shown in the previous application, lines 1-6 the code configures the MapReduce computation.
  • In line 7 a client side driver is created and in line 8 the partition file is used to configure map tasks with static data. (Remember this is an iterative application)
  • Line 13 contains the main loop of the iterative computation.
  • Line 14 uses the driver to run MapReduce computation. Here we have used the API call "runMapReduceBCast()" because  in this application we need to send our variable data to all the map tasks without "scattering" it. If the computation requires scattering the variable data one can use "runMapReduce(List<KeyValuePair>)" API call instead. Twister also support no parameter API call "runMapReduce()" which takes no variable data as well. You can see all these API calls in action in the samples.
  • In line 15 we wait till the end of the computation and in line 16 and 17 we receive the result of the current iteration via the combine task.
  • Line 19 to 22 deals with the loop condition.
  • At line 23 we iterate!
  • Line 24 cleans up the map/reduce tasks and closes the client side connections.
     

Map-Only (Reduce less) Application using Twister

We noticed that this is a very common use case in many domains and most of the embarrasingly parallel applications can be  implemented this way. Here is how you do it with Twister.

1.  JobConf jobConf = new JobConf("CAP3-map-reduce");

2.  jobConf.setMapperClass(CAP3MapTask.class);
3.  jobConf.setNumMapTasks(numMapTasks);
4.  jobConf.setNumReduceTasks(0);

5.  TwisterDriver driver = new TwisterDriver(jobConf);
6.  driver.configureMaps(partitionFile);
7.  TwisterMonitor monitor = driver.runMapReduce();
8. monitor.monitorTillCompletion();
9. driver.close();
  • Line number 1-4 deals with the job configuration. Here we don't need to specify a Reduce and Combiner classes because we are not planning to use these two phases of the computation.
  • In line number 4 we inform Twister that this is a Map-only( or reduce less) computation.
  • Line 5 creates the client side driver for this computation and line 6 configures the map tasks using the provided partition file.
  • Line 7 we run the computation and in line 8 we wait till it is completed.
  • Line 9 performs the cleanup tasks.

Next step: Explore Samples!


 

Enable Fault Tolerance
Once Fault Tolerance is enabled, Twister client can detect the fault, and try to restore the computation from the last iteration.

In order to enable fault tolerance, add the following code to job configuration:

1.  jobConf.setFaultTolerance();

 

Use API to create Partition File
Twister now can create the partition file inside the client code, without explicitly executing create_partition_file.sh

In order to do this, use the following code to map configuration in driver:

1.  driver.configureMaps(common_directory, file_filter);