Galago
(UNDER CONSTRUCTION)
Created by Trevor Strohman
Contents
External Links
Notes
Galago is made of up several components. The most powerful component is
TupleFlow. TupleFlow is a kind of MapReduce framework that allows
for the stages of the map-reduce to have multiple inputs and outputs. Trevor
describes TupleFlow as, "a mix between MapReduce, make/ant, and a database
system. TupleFlow is like MapReduce in that it can efficiently parallelize a
large computation. It is like make or ant in that it runs based on a file that
looks much like a Makefile. And, it is like a database system because the glue
that holds the computation together are lists of tuples, just like database tables."
It is on top of the TupleFlow framework that Galago's indexers are built. The indexers
are made of several Java classes which can be used in TupleFlow stages; depending on
which ones you combine, you can make a tradition indexer (i.e. one that produces
an inverted index) or a query likelihood binned indexer, among others.
These are examples of the type of applications that TupleFlow can
enhance.
Another useful component of Galago is the retrieval system.
This does not rely on TupleFlow, but it knows how to read and interact with the
inverted indexes created by Galago's indexers. It is also easily extended and
is a great way to prototype new retrieval operators.
There are other features that come with Trevor Strohman's release of Galago. However,
these are not covered here (except for Pig-Galago 1.0).
Getting Galago
There are two branches of Galago:
Trevor's branch and
CIIR's branch. While you can of course checkout Trevor's branch, this wiki is made specifically with CIIR's branch in mind. The instructions on this page do not necessarily (and in some cases, definitely do not) apply to Trevor's branch
Trevor's Galago Branch
Trevor's branch is available via
this Git repository. To checkout a copy, you'll first need to get
Git, which is available here. It is pretty easy to install.
Once you have Git, you can download the current version of Galago by issuing the command:
git clone git://repo.or.cz/galago.git
For information about what is included in Trevor's Galago branch and how to use it, see the
Galago Guidebook.
CIIR's Galago Branch
Currently, CIIR's Galago branch includes the following packages:
- Galago (TupleFlow, TREC indexing classes, and a retrieval framework)
- Pig-Galago (A patched version of
com.yahoo.pig version 1.0 ONLY. Pig 1.2+ are not supported with TupleFlow).
-
edu.umass (Includes a distributed retrieval framework and custom operators built on top of Galago)
The source code for each of these is maintained in a separate repository; however, there is an additional repository which stores the most recent Jar files, including libraries, from
new_galago and
edu.umass. The latest version of any of these can be checked out using the following commands (you must have an account on Sydney to access these):
svn co svn+ssh://sydney.cs.umass.edu/home/hfeild/svn/galago/tags/latest galago
svn co svn+ssh://sydney.cs.umass.edu/home/hfeild/svn/pig_galago/tags/latest pig_galago
svn co svn+ssh://sydney.cs.umass.edu/home/hfeild/svn/edu.umass/tags/latest edu.umass
svn co svn+ssh://sydney.cs.umass.edu/home/hfeild/svn/ciir_galago_bin/tags/latest ciir_galago_bin
If you are checking out one of the source repositories, you can build the Jar
file for that particular package =cd=ing into the directory and issuing the command:
The fresh Jar file for the package will be in the
dist/ subdirectory. For instance, to check out and build
galago, do:
$ svn co svn+ssh://sydney.cs.umass.edu/home/hfeild/svn/galago/tags/latest galago
$ cd galago/
$ ant jar
$ ls dist/
galago.jar
The
ciir_galago_bin package has a directory of Jar files, so no building is necessary.
It also has a directory of sample Galago parameter files and a
scripts/ directory. See
the
README it contains for more information.
Next, you will need to set up your CLASSPATH environment variable so that Java
will know where to find the Jar files you checked out.
Setting up CIIR Galago Packages
It is helpful to set up your environment to make issuing commands to Galago
easier. Let's assume that you have downloaded any or all of the above
CIIR Galago packages in the following directory:
/home/name/galago_packages/
You need to add every Jar file in the CIIR package(s) to the
$CLASSPATH
environment variable. You can do this manually or you can add the following
function to the bottom of your
.bashrc file (courtesy of Trevor Strohman):
append_classpath() {
local DIRECTORY=$1
for f in `find $DIRECTORY -name "*.jar"`; do
if [ "${#CLASSPATH}" -eq "0" ]; then
CLASSPATH=$f
else
CLASSPATH="${CLASSPATH}:${f}"
fi
done
}
Add the following lines elsewhere in your
~/.bashrc file:
export GALAGO_DIR=/home/name/galago_packages
append_classpath $GALAGO_DIR
export CLASSPATH
If you have trouble using DRMAA below, you might also try adding the follow:
export LD_LIBRARY_PATH:$LD_LIBRARY_PATH:$SGE_ROOT/lib/$SGE_ARCH
On Sydney and the Swarm, this should already be exported by the system Bash file,
but if for some reason it does not, try adding the above.
TupleFlow
Trevor describes TupleFlow in depth in his
UMass Dissertation (see Chapter 3).
This walk through is very similar to the description in the dissertation, but presents the information in a different order.
The code, parameter file, and some play data used in the example given below is in the following zip files:
Galago TupleFlow Examples (01-Jun-2008),
Galago TupleFlow Examples (24-Jul-2008). The 23-Jul-2008 version
is the one used in these step-by-step examples.
There are instructions on how to build the source
and run TupleFlow on the parameter file.
To use TupleFlow, you need two things: Java classes and an XML file that tells Galago how to
connect these classes. The Java classes must extend or implement certain TupleFlow interfaces/abstract classes
depending on the nature of the class you are writing, and the XML file must be valid in that your classes
fit together.
- Example of a TupleFlow process.:
First, lets get a feel for how TupleFlow...flows. In the example above, the large boxes are
stages
and the smaller boxes they contain are
steps. The labels for stages are in grey and the
labels for steps are contained within the corresponding step's box. Boxes which are shaded with blue lines
(e.g.
stage_2 in the example) are
repeatable stages. Finally, the arrows show the direction of the flow of
data, or tuples.
A
stage is a collection of
steps. Stages can be repeatable (like
stage_2) or not (like
stage_1 and
stage_2).
Repeatable stages are equivalent to mapping stages in the MapReduce framework, whereas non-repeatable
stages which follow repeatable stages are like reducing stages.
Lets walk through the example above. In
stage_1, the class
ReadFileNames is called. Lets assume
ReadFileNames reads
the filenames contained within a directory. As each filename is read in, they are sent to one of several instantiations
of
stage_2. Each instantiation is given one or more filenames and is received by the class
TokenizeFile. This
class tokenizes the given filename into a list of words or tokens. It sends this list to the next step in
stage_2:
GetWordCounts. This class counts how many times a given word occurs in the document that was just tokenized.
Now we have a tuple:
<word, count>. These tuples are sent to
SortByWord where they are sorted into alphabetical
order.
The final stage,
stage_3, is non-repeatable, meaning that it must wait for every instance of
stage_2 to
complete before it can run. TupleFlow knows this and merges the output from each
stage_2 instance into
one giant alphabetically sorted list of
<word, count> tuples. This is then handed off to
ReduceCounts in
stage_3 one tuple at a time.
ReduceCounts will add all counts for the same word, so that:
<house, 1>
<house, 3>
<house, 2>
becomes:
Once all of a word's tuples have been reduced to one with the final count, it is passed on to
the next step,
WriteToFile, which writes the tuple to the output file.
So, that is the general flow of the data. Next, we will go over:
Connecting Stages
Data travels between stages as a stream of tuples. Galago comes with a tuple type builder which
will create a tuple type which implements the methods demanded by the
galago.tupleflow.Type
interface. The tuple type builder can construct a class that has tuple fields of the following types:
- int
- long
- float
- double
- boolean
- String
- bytes (byte[])
If you want to have tuple fields which are of a different type, you will have to create
the tuple type class manually.
Lets make a little more sense of these tuple types by creating the tuple type that will
flow between
stage_1 and
stage_2 in our example above. All
stage_1 is doing is
reading in filenames and passing each one to
stage_2. What we need is a tuple that
has only one field: a String to hold the filename. An important note: Galago will
expect all types to have an ordering, even if you do not intend to sort that type. For
the sake of argument (or at least for the sake of Galago not running the parameter
file we will create in a few minutes), lets say we want this type to be sortable
based on alphabetical order. To create this tuple type, we use the following command:
$ java galago.tupleflow.TemplateTypeBuilder \
FileName.java FileName org.mytypes \
String:filename order:+filename
This command says:
"Create a tuple type called FileName in the file FileName.java
as part of the org.mytypes package; give it one field: a String called filename
which can be sorted in ascending order (i.e. alphabetically, since it's a String)."
If you wanted to sort it in descending alphabetical order, you would use
order:-filename.
To go from
stage_2 to
stage_3, we need a different tuple type -- one that has
two fields: as
String for the word and an
int for the count. Since it holds
word counts, we'll call it
WordCount.
$ java galago.tupleflow.TemplateTypeBuilder \
WordCount.java WordCount org.mytypes \
String:word int:count order:+word order:+word-count order:+word+count
Notice that there are three possible orderings of this type; we will only really need
the first one (
order:+word), which sorts the types in ascending alphabetical order,
but the other two demonstrate how one would allow for multiple sortings.
order:+word-count
sorts first by ascending alphabetical order of
word and breaks ties by descending order
of
count; in contrast,
order:+word+count breaks ties by ascending order of
count.
Now that we have the glue, as it were, that holds the stages together, lets see how
the stages actually make and use these tuples.
Writing and Connecting Step Classes
The first step of the first stage must (as far as I can tell) implement the Galago
interface,
galago.tupleflow.ExNihiloSource<T>. An ex nihilo class (latin for "out of nothing")
has only one method that it must define:
run().
Galago already comes with a class
to do exactly what we want our ReadFileNames class to do: galago.tupleflow.FileSource. However,
for completeness sake, we will make our own, based almost entirely on
galago.tupleflow.FileSource.
Another important note: all step classes
can (but don't
have to) have two different constructors.
The first kind of constructor takes no parameters -- use this if you need to initialize any
data structures that you need to use to process the incoming data. If you do not have
any such data structures, you can omit the constructor all together. If you need to read in a
parameter from the parameter file (which we are going to talk about in the next subsection),
your constructor may take one parameter of type
galago.tupleflow.TupleFlowParameters.
So, our
ReadFileNames step will have no inputs; its constructor needs to take
a
TupleFlowParameters object so it knows what file or directory to read in; it needs
to send each filename it reads in to the next step; and it needs to output the filenames
in the object we created:
FileName. We should make a file that looks something like this
(this is a slightly modified version of Galago's
galago.tupleflow.FileSource):
-[Collapse code]
/* org.myparse.ReadFileNames */
package org.myparse;
import org.mytypes.FileName;
import galago.tupleflow.OutputClass;
import galago.tupleflow.ExNihiloSource;
import galago.tupleflow.TupleFlowParameters;
import galago.tupleflow.Processor;
import galago.tupleflow.Step;
import galago.tupleflow.Linkage;
import galago.tupleflow.IncompatibleProcessorException;
import galago.tupleflow.Parameters.Value;
import galago.tupleflow.execution.ErrorHandler;
import java.io.File;
import java.io.IOException;
import java.util.List;
@OutputClass(className="org.mytypes.FileName", order={ "+filename" })
public class ReadFileNames implements ExNihiloSource<FileName> {
TupleFlowParameters parameters;
public Processor<FileName> processor;
/**
* Reads in the TupleFlow parameter file.
* We need this constructor because it is the only way we can read in the
* name of the file[s]/direcotry that the user want's processed.
*/
public ReadFileNames( TupleFlowParameters parameters ) {
this.parameters = parameters;
}
private void processDirectory( File root ) throws IOException {
for( File file : root.listFiles() ) {
if( file.isHidden() )
continue;
if( file.isDirectory() ) {
processDirectory( file );
} else {
processor.process( new FileName( file.toString() ) ); // <-- Sends the FileName object to the next step.
}
}
}
/**
* This is the only methods that MUST be implemented in this class.
* This is a good example of how to read in a parameter from the parameter
* file and the types of checks that should be done to ensure that the
* correct parameter is present.
*/
public void run() throws IOException {
if( parameters.getXML().containsKey( "directory" ) ) {
List<Value> directories = parameters.getXML().list( "directory" );
for( Value directory : directories ) {
File directoryFile = new File( directory.toString() );
processDirectory( directoryFile );
}
} else if( parameters.getXML().containsKey( "file" ) ) {
List<Value> files = parameters.getXML().list( "file" );
for( Value file : files ) {
String filename = file.toString();
processor.process( new FileName(filename) );
}
}
processor.close(); // <-- Indicates that all files have been read in.
}
/**
* Tells the next step/stage that we are done processing our stream. This
* method is not needed in StandardStep classes as the StandardStep
* interface already provides it. However, here we need it because
* ExNihiloSource does not include it.
*/
public void close() throws IOException {
processor.close();
}
/**
* Links to the next step/stage. This has a similar story to that of the
* close() method above.
*/
public void setProcessor( Step nextStage ) throws IncompatibleProcessorException {
Linkage.link( this, nextStage );
}
/**
* This function is called when verifying that the parameter file has the
* correct fields and tags. Here we are checking that there is at least one
* <directory> or <file> parameter passed to this class in the TupleFlow
* parameter file and that all files or directories passed in 1) exist and
* 2) are valid files/directories. If not, we tell Galago's error handler that there is a
* problem and Galago will report that error before any processing is done.
*/
public static void verify( TupleFlowParameters parameters, ErrorHandler handler ) {
if( !(parameters.getXML().containsKey( "directory" ) || parameters.getXML().containsKey( "file" )) ) {
handler.addError( "FileSource requires either at least one directory or file parameter." );
return;
}
if( parameters.getXML().containsKey( "directory" ) ) {
List<Value> directories = parameters.getXML().list( "directory" );
for( Value directory : directories ) {
File directoryFile = new File( directory.toString() );
if( directoryFile.exists() == false ) {
handler.addError( "Directory " + directoryFile.toString() + " doesn't exist." );
} else if( directoryFile.isDirectory() == false ) {
handler.addError( directoryFile.toString() + " exists, but it isn't a directory." );
}
}
} else if( parameters.getXML().containsKey( "file" ) ) {
List<Value> files = parameters.getXML().list( "file" );
for( Value file : files ) {
File f = new File(file.toString());
if( f.exists() == false ) {
handler.addError( "File " + file.toString() + " doesn't exist." );
} else if( f.isFile() == false ) {
handler.addError( file.toString() + " exists, but isn't a file." );
}
}
}
}
}
Alright, so that was the longest example we will cover and its only purpose is to show
you how you would go about making your own 'first' step. It is easiest to just use
galago.tupleflow.FileSource than writing your own.
Now we need to write the three classes that go in
stage_2. These classes are just regular
steps, so we will implement the
galago.tupleflow.StandardStep<T,U> Galago interface. This interface requires one
method:
public U process(T t). You can also provide a
close() method and the constructors mentioned
above if you would like.
For the
TokenizeFile step, we want to create a class that will take a
org.mytypes.FileName object,
read in the file it points to, and output a list of the tokens in that file.
Now is a good time to mention the following: while we automatically generated the tuple types that
flow between stages, as soon as you are within a stage, any objects are fair game. For example,
TokenizeFile might want to pass an
ArrayList<String> object to
GetWordCounts. Or we
could create a new wrapper class that contains the list and pass that to
GetWordCounts.
Lets go with the first one for ease of use. (Please note: if the input files are particularly large, this
parsing method is not scalable; rather than storing the tokens in an array, we could just emit each
token as we parse it. Such a method would be much more scalable, though it would have more overhead because
of the function calls.)
So we want to read in
org.mytypes.FileName objects and spit out
ArrayList<String>
objects. We can use the following code:
-[Collapse code]
/* org.myparse.TokenizeFile */
package org.myparse;
import org.mytypes.FileName;
import galago.tupleflow.InputClass;
import galago.tupleflow.OutputClass;
import galago.tupleflow.StandardStep;
import galago.tupleflow.execution.Verified;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Scanner;
@Verified
@InputClass(className="org.mytypes.FileName")
@OutputClass(className="java.util.ArrayList<String>")
public class TokenizeFile extends StandardStep<FileName, ArrayList<String>> {
/**
* This is the only method we need.
*/
public void process( FileName fileName ) throws IOException {
ArrayList<String> tokens = new ArrayList<String>();
Scanner scan = new Scanner( new File(fileName.filename) );
while( scan.hasNext() )
tokens.add( scan.next() );
scan.close();
// Call the next stage ('processor' is defined in our super class)
processor.process( tokens );
}
}
That's it! Much easier than the first one...Now we need to make a similar class for the
GetWordCounts class. This class will take the
ArrayList that contains the tokens
from the file and emit a tuple that looks like:
<work, 1> (of the type
org.mytypes.WordCount,
which we made earlier). We do not need to count up how many times each token occurs in the
ArrayList --- the reducer (which we will cover next) will take care of that.
Here's what it might look like:
-[Collapse code]
/* org.myparse.GetWordCounts */
package org.myparse;
import org.mytypes.WordCount;
import galago.tupleflow.InputClass;
import galago.tupleflow.OutputClass;
import galago.tupleflow.StandardStep;
import galago.tupleflow.execution.Verified;
import java.util.ArrayList;
import java.io.IOException;
@Verified
@InputClass(className="java.util.ArrayList")
@OutputClass(className="org.mytypes.WordCount")
public class GetWordCounts extends StandardStep<ArrayList<String>, WordCount> {
/**
* This is the only method we need.
*/
public void process( ArrayList<String> tokens ) throws IOException {
for( String token : tokens )
processor.process( new WordCount(token, 1 ) );
}
}
For the
SortByWord step, we will relay on the Galago sorter:
galago.tupleflow.Sorter.
We will see how to use this when we write our parameter file.
Moving on to
stage_3, we need to create a class to reduce the
<word, count> tuples
so that it does not contain any tuples. Unlike the
GetWordCounts class, we do not
need to use a
HashMap to store words and their counts. In fact, this would make
our process rather unscalable. Rather, we know that the stream of tuples coming in
is sorted by
word in alphabetical order (notice how we specify this assumption
with
"+word" on the
@InputClass line). We can take advantage of this by
keeping track of the current word and its count; as soon as we see a new word,
we pass the old word and its count to the next step. Here's how it might look:
-[Collapse code]
/* org.myparse.ReduceCounts */
package org.myparse;
import org.mytypes.WordCount;
import galago.tupleflow.InputClass;
import galago.tupleflow.OutputClass;
import galago.tupleflow.StandardStep;
import galago.tupleflow.Linkage;
import galago.tupleflow.execution.Verified;
import java.io.IOException;
@Verified
// We specify that expect the input to be sorted alphabetically.
@InputClass(className="org.mytypes.WordCount", order={"+word"})
// If we specify an ordering for the output, it would say to Galago,
// "This class emits a stream of data in this order".
@OutputClass(className="org.mytypes.WordCount")
public class ReduceCounts extends StandardStep<WordCount, WordCount> {
public String curWord;
public int curCount;
public ReduceCounts() {
curWord = "";
curCount = 0;
}
/**
* This is the only method we need.
*/
public void process( WordCount wc ) throws IOException {
if( curWord.compareTo(wc.word) == 0 ){
curCount += wc.count;
} else {
// Make sure not to emit the default curWord.
if( curWord.compareTo("") != 0 )
processor.process( new WordCount(curWord, curCount) );
curWord = wc.word;
curCount = wc.count;
}
}
/**
* We need to make sure we get the last word in the stream.
*/
@Override
public void close() throws IOException {
if( curWord.compareTo( "" ) != 0 )
processor.process( new WordCount(curWord, curCount) );
processor.close();
}
}
Finally, we need to write our merged counts to a file. We do this with a class
that implements
galago.tupleflow.Processor<T>, which is similar to
galago.tupleflow.StandardStep<T,U> except that it has now output type
(i.e., it never makes a call to
processor.process()). Here's what our class
might look like:
-[Collapse code]
/* org.myparse.WriteToFile */
package org.myparse;
import org.mytypes.WordCount;
import galago.tupleflow.InputClass;
import galago.tupleflow.Processor;
import galago.tupleflow.execution.ErrorHandler;
import galago.tupleflow.TupleFlowParameters;
import java.io.IOException;
import java.io.BufferedWriter;
import java.io.FileWriter;
@InputClass(className="org.mytypes.WordCount")
public class WriteToFile implements Processor<WordCount> {
public BufferedWriter out;
public WriteToFile( TupleFlowParameters params ) throws IOException {
String filename;
if( params.getXML().containsKey("filename") )
filename = params.getXML().get( "filename" );
else
throw new IOException( "Missing filename!" );
out = new BufferedWriter( new FileWriter(filename) );
}
/**
* This is the only method we need -- it writes <word, count> pairs to
* the output file as they are received.
*/
public void process( WordCount wc ) throws IOException {
out.write(wc.word + "\t" + wc.count);
out.newLine();
}
/**
* Need this to close the file.
*/
public void close() throws IOException {
out.close();
}
/**
* Makes sure that the TupleFlow parameter file passes a parameter called
* 'filename' to this class.
*/
public static void verify( TupleFlowParameters parameters, ErrorHandler handler )
{
if( !parameters.getXML().containsKey( "filename" ) )
{
handler.addError( "WriteToFile requires a 'filename' parameter." );
}
}
}
Creating a TupleFlow Parameter File
We have our Java classes, but how do we tell TupleFlow how to use them? We need an XML file called
a TupleFlow parameter file. This is similar to a Makefile or Ant build script.
TupleFlow parameter files have two main sections: one for
stage connections
and another for
stage descriptions.
Both sections together are within a
<job></job> tag. Also, a special property needs to be set at the
top of the file to specify how many jobs a repeatable stage should be split into (this is controlled by a hash
function within our automatically generated TupleFlow types, such as WordCount and FileName above).
Thus, we get a structure that looks like:
<job>
<!-- SPECIFY THE NUMBER OF JOBS PER REPEATABLE STEP -->
<property name="hashCount" value="20" />
<!-- CONNECTIONS SECTION -->
<connections>
...
</connections>
<!-- STAGE DESCRIPTION SECTION -->
<stages>
...
</stages>
</job>
Stage Descriptions Section
The
<stages></stages> section of the a TupleFlow parameter file is made up of one
<stage></stage>
block per stage you would like to specify. We have three stages in our example, so we will have three
<stage></stage> blocks. Each of these blocks is made of of two parts: a connections portion
(slightly different than the stage connections section we will talk about next) and a steps section.
The connections section specifies the input type or types that this stage accepts as well as what
order it is required to be in. It also specifies every type emitted by this stage. You
can name your stages anything you would like -- we use
stage_# here for simplicity, though
it may make more sense to use
readFileNames,
parseAndCountText, and
reduceAndWriteCounts.
In our example,
stage_1 has no inputs and one output (of type
org.mytypes.FileName);
stage_2 has
one input (of type
org.mytypes.FileName) and one output (of type
org.mytypes.WordCount); and
stage_3 has one input (of type
org.mytypes.WordCount) and no output. We also need to give
ids to each of these inputs and outputs. This is needed to make sure that the correct output from one stage
is plugged into the correct input to another stage.
Here's what the connections section might look like for each of our three stages:
-[Collapse code]
<stage id="stage_1">
<connections>
<output class="org.mytypes.FileName"
order="+filename"
id="filenames" />
</connections>
<steps>
...
</steps>
</stage>
<stage id="stage_2">
<connections>
<input class="org.mytypes.FileName"
order="+filename"
id="filenames" />
<!-- Notice how the id of the input matches the corresponding id
of the output from stage_1. -->
<output class="org.mytypes.WordCount"
order="+word"
id="wordCounts" />
</connections>
<steps>
...
</steps>
</stage>
<stage id="stage_3">
<connections>
<input class="org.mytypes.WordCount"
order="+word"
id="wordCounts" />
</connections>
<steps>
...
</steps>
</stage>
Just a few notes on the connections listed above: for the type
org.mytypes.FileName, we only
specified the order
+filename when we created the class, so we can only specify that order
in our stages. For
org.mytypes.WordCount, we specified three options when creating the class:
+word,
+word-count, and
+word+count. We can use any of these three. However, if in
stage_2 we specify that the order will be in a particular order, then
stage_3 must
expect its input in the same order. Otherwise, Galago will yell.
Now we can fill in the
<steps></steps> section for each stage. This section
requires that each class that TupleFlow is to call be specified in a
<step>
tag that looks something like:
<step class="class.to.be.called" />. If the class
to be called requires any parameters, you can use a
<step></step> block like:
<step calss="class.to.be.called">
<myFirstParam>param1</myFirstParam>
</step>
If the current stage requires input, then the first step must be:
If the current stage has an ouput, then the last step must be:
<output id="myOutputId" />
In our example, the first stage
makes use of two classes -- our
ReadFileNames class and the Galago
Sorter
class. Recall that
ReadFileNames requires one parameter: the name of the directory
or files to read in. In
ReadFileNames, we specified that if a directory is
to be read in, then the name of that directory should be in the format
<directory>dirName</directory>, whereas a file should look like
<file>filename</file>.
We could have required those tags look like anything, so you have liberty in how
you name your tags. However, since we chose
<directory> and
<file> in
ReadFileNames.java,
we must use those in the parameter file. To describe this step, we would write something
like:
<step class="org.myparse.ReadFileNames">
<directory>myDocDir</directory>
</step>
The second step requires that we call the
galago.tupleflow.Sorter class. This
class requires two parameters: the type that it is to sort (in our case,
org.mytypes.FileName) and the order in which it is to be sorted (in our case,
+filename, which is also our only option for this type). That step looks like this:
<step class="galago.tupleflow.Sorter">
<class>org.mytypes.FileName</class>
<order>+filename</order>
</step>
The last thing we need to do is specify the output id:
<output id="filenames" />
The other stages are just like this, though both the other stages require
<input/> tags and
stage_3 does not need an
<output/> tag. Here is
what the entire stage section of the TupleFlow parameter file will look like:
-[Collapse code]
<stage id="stage_1">
<connections>
<output class="org.mytypes.FileName"
order="+filename"
id="filenames" />
</connections>
<steps>
<step class="org.myparse.ReadFileNames">
<directory>data</directory>
</step>
<step class="galago.tupleflow.Sorter">
<class>org.mytypes.FileName</class>
<order>+filename</order>
</step>
<output id="filenames" />
</steps>
</stage>
<stage id="stage_2">
<connections>
<input class="org.mytypes.FileName"
order="+filename"
id="filenames" />
<!-- Notice how the id of the input matches the corresponding id
of the output from stage_1. -->
<output class="org.mytypes.WordCount"
order="+word"
id="wordCounts" />
</connections>
<steps>
<input id="filenames" />
<step class="org.myparse.TokenizeFile" />
<step class="org.myparse.GetWordCounts" />
<step class="galago.tupleflow.Sorter">
<class>org.mytypes.WordCount</class>
<order>+word</order>
</step>
<output id="wordCounts" />
</steps>
</stage>
<stage id="stage_3">
<connections>
<input class="org.mytypes.WordCount"
order="+word"
id="wordCounts" />
</connections>
<steps>
<input id="wordCounts" />
<step class="org.myparse.ReduceCounts" />
<step class="org.myparse.WriteToFile">
<filename>wordCounts.txt</filename>
</step>
</steps>
</stage>
Stage Connections Section
The connections stage of the TupleFlow parameter file specifies the type and order of data that flows
between stages. In our example above, we have three stages, and therefore we have two connections: one
from
stage_1 to
stage_2 and another between
stage_2 and
stage_3. It is possible that a stage has more
than one input or output, however, and this would result in the need of more connection specifications.
Each connection needs to specify the connection type (i.e., the class name of the type that is being passed
between two stages), the order, the input stage (the stage that is emitting the data of the above type), the
output stage (the stage that is receiving the data from the input class), 'endpoint' identifiers
(which is just an id to help keep track of which data output we are specifying from each stage),
and finally, a specification for each output stage as to whether it is repeatable or not.
If the output class is repeatable, then a hash key is also required. To specify that an
output stage is repeatable,
it must have the attribute
assignment="each". Otherwise, it should have
assignment="combined".
Lets make the connection between
stage_1 and
stage_2 in our example. First, the data type
flowing between the two stages is
org.mytypes.FileName. It will be in the order
+filename
and, since we want
stage_2 to be repeatable, we will specify that the hash key according to which
the FileName objects will be split into parallel jobs will be
+filenames. Here's what it will look like:
<connection class="org.mytypes.FileName"
order="+filename"
hash="+filename">
<input stage="stage_1" endpoint="filenames" />
<output stage="stage_2" endpoint="filenames" assignment="each" />
</connection>
Between
stage_2 and
stage_3, we need to pass the type
org.mytypes.WordCount, and
we want it to be in the order
+word (as per our stage section above).
stage_3 should
not be repeatable, because we need a lock on the file to which we will write; thus,
we need
assignment="combined":
<connection class="org.mytypes.WordCount"
order="+word">
<input stage="stage_2" endpoint="wordCounts" />
<output stage="stage_3" endpoint="wordCounts" assignment="combined" />
</connection>
Notice that in both of these examples, the endpoint name corresponds with the
input and output id's that we specified in the stage section. This way, TupleFlow
knows which inputs match up with which outputs.
That's it! Here's what the whole parameter file looks like:
-[Collapse code]
<!-- The full TupleFlow parameter file. -->
<job>
<property name="hashCount" value="20" />
<connections>
<connection class="org.mytypes.FileName"
order="+filename"
hash="+filename">
<input stage="stage_1" endpoint="filenames" />
<output stage="stage_2" endpoint="filenames" assignment="each" />
</connection>
<connection class="org.mytypes.WordCount"
order="+word">
<input stage="stage_2" endpoint="wordCounts" />
<output stage="stage_3" endpoint="wordCounts" assignment="combined" />
</connection>
</connections>
<stages>
<stage id="stage_1">
<connections>
<output class="org.mytypes.FileName"
order="+filename"
id="filenames" />
</connections>
<steps>
<step class="org.myparse.ReadFileNames">
<directory>data</directory>
</step>
<step class="galago.tupleflow.Sorter">
<class>org.mytypes.FileName</class>
<order>+filename</order>
</step>
<output id="filenames" />
</steps>
</stage>
<stage id="stage_2">
<connections>
<input class="org.mytypes.FileName"
order="+filename"
id="filenames" />
<!-- Notice how the id of the input matches the corresponding id
of the output from stage_1. -->
<output class="org.mytypes.WordCount"
order="+word"
id="wordCounts" />
</connections>
<steps>
<input id="filenames" />
<step class="org.myparse.TokenizeFile" />
<step class="org.myparse.GetWordCounts" />
<step class="galago.tupleflow.Sorter">
<class>org.mytypes.WordCount</class>
<order>+word</order>
</step>
<output id="wordCounts" />
</steps>
</stage>
<stage id="stage_3">
<connections>
<input class="org.mytypes.WordCount"
order="+word"
id="wordCounts" />
</connections>
<steps>
<input id="wordCounts" />
<step class="org.myparse.ReduceCounts"/>
<step class="org.myparse.WriteToFile">
<filename>wordCounts.txt</filename>
</step>
</steps>
</stage>
</stages>
</job>
Running TupleFlow
To run TupleFlow on the above parameter file (or on one that you have created), do one of the following:
Local (non-distributed):
mkdir /path/to/my/tmp/dir
java -Xmx900m galago.tupleflow.execution.JobExecutor \
local paramFile.xml /path/to/my/tmp/dir
DRMAA (distributed):
mkdir /path/to/my/tmp/dir
java -Xmx900m galago.tupleflow.execution.JobExecutor \
drmaa paramFile.xml /path/to/my/tmp/dir
If you are running in local mode, all of the logging messages and any errors will be
displayed to the
stderr on your screen. However, if you use the distributed mode,
these messages are all kept in files within the temporary directory (in the example above,
these files are located in:
/path/to/my/tmp/dir/stderr/.
Galago keeps track of which stages fail and succeed. A quick way to check is to do:
ls /path/to/my/tmp/dir/jobs/*/
This will list the jobs that each stage was broken into (for each stage, these are numbered
from 0 to the max number of jobs a stage could be split into, based on the
hashCount
specified at the top of the parameter file). Stages that completed successfully will have
an addition file with the job number followed by
.complete. If the job failed, it will
be followed by
.error.
If you are able to fix the bug that caused the error, and the changes you made only affect
the data flow from where the errors occurred onwards, then if you pass the same temporary
directory to TupleFlow, it won't redo the stages that completed successfully, but rather start at
the failed stages.
However, if you do change something in your code that affects the flow of data from stages
that have already finished successfully, or if you want a clean start, delete the contents of
the temporary directory before running TupleFlow.
Indexing on Sydney / Swarm
Local (i.e., non-distributed)
java -Xmx900m galago.tupleflow.execution.JobExecutor \
local index_param_file.xml /path/to/tmp/dir/
DRMAA (i.e., distributed)
java -Xmx900m galago.tupleflow.execution.JobExecutor \
drmaa index_param_file.xml /path/to/tmp/dir/
Querying
Simple Mode
Not yet implemented :(.
Explicit Mode
This is the only mode currently supported.
java -Xmx900m galago.retrieval.StructuredRetrieval \
query_param_file.xml
Structured Mode
Not yet implemented :(.
Customizing Galago
Creating your own operators
Creating your own indexing processes
to top