Sunday, June 29, 2008

Hadoop on Amazon EC2 to generate Species by Cell Index

I finally took the plunge onto the Amazon Elastic Cloud to do some proper distributed processing using Hadoop after spending the past few weeks playing with MapReduce and distributed processing.

This is a post about my experiences and lessons, but for those who just want the result:

Macbook Pro, 2G JVM, 1 node Hadoop, species per 1 degree cell for all GBIF occurrences: 2449 secs
EC2 20 small instances, species per 1 degree cell for all GBIF occurrences: 472 secs

And the details:

This experiment was to simply generate the Species per Cell (1 degree x 1 degree) index for the entire GBIF occurrence record store (135M record index used) and run some comparisons for local versus cloud execution.

So the code (About 20 lines of 'real' code and maybe it could be optimised further - the reduce for example):

/**
* Generates the species by cell index
* @author timrobertson
*/
public class SpeciesByCell extends MapReduceBase
implements Mapper,
Reducer {

// assuming this is a large object as they reuse it in the tutorials...
private final static IntWritable cellId = new IntWritable();
private Text speciesMap = new Text();
private Text speciesReduce = new Text();

// reuse the pattern for performance
private Pattern tabPattern = Pattern.compile("\t");

/**
* Outputs Cell:Species
*/
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter) {
String data = value.toString();
if (data != null) {

// sci name is column 2, lat 11, long 12
// split is not the best way to split a string
String parts[] = tabPattern.split(data);
if (parts.length>= 12) {
try {
cellId.set(SpeciesByCell.toCellId(Float.parseFloat(parts[10]), Float.parseFloat(parts[11])));
speciesMap.set(parts[1]);
output.collect(cellId, speciesMap);
} catch (NumberFormatException e) {
} catch (UnableToGenerateCellIdException e) {
} catch (IOException e) {
}
}
}
}

/**
* Distincts the species
*/
public void reduce(IntWritable key, Iterator values,
OutputCollector output, Reporter reported) throws IOException {
Set species = new HashSet();
while (values.hasNext()) {
species.add(values.next().toString());
}
for (String s : species) {
speciesReduce.set(s);
output.collect(key, speciesReduce);
}

}

/**
* Gives a cell id
*/
public static int toCellId(Float latitude, Float longitude) throws UnableToGenerateCellIdException {
if (latitude== null
|| latitude < -90 || latitude > 90
|| longitude < -180 || longitude > 180) {
throw new UnableToGenerateCellIdException("Latitude["+ latitude+"], Longitude["+longitude+"] cannot be " +
"converted to a cell id");
} else {
int la = new Double(Math.floor(latitude + 90)).intValue();
int lo = new Double(Math.floor(longitude + 180)).intValue();
int cellId = (la * 360) + lo;
return cellId;
}
}
}


Hadoop says it supports GZip files - so I GZipped the input data, which is simply 13 columns of DwC. Unzipped, the data is 13G, gzipped 972M.
Following uploading and running however, I found some pretty unusual results. Falling back to my favourite invasive (Passer domesticus) to run a smaller test I found that on GZipped the map worked on 246,768 rows, not the full 900,000+. Therefore, I conclude that I am either not running it right, or it does not support GZipped input. I don't see how the distributed chunking could work if it is GZipped, and looking through the docs I think it is only on the output that you can use GZip. So, I then started again, and extracted the full 13G and got that on the HDFS for processing.

Now, since I am a maven user, I need to build up an executable jar; here is the pom section that sets the manifest:

<build>
<defaultGoal>package</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.ibiodiversity.index.mapreduce.SpeciesByCell</mainClass>
<packageName>com.ibiodiversity.index.mapreduce</packageName>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>


Now, we put the input file onto S3 for use in EC2:
bin/hadoop fs -put /path/to/source s3://<id>:<secret>@<bucket>/path/to/target

(Ok, do NOT do s3://<id>:<secret>@<bucket>/ - it blew up when I tried to move the file out of S3 onto a local HDFS since it was in the root. My second attempt (of 2 hours transfer) was to ibiodiversity-gbif-dwc/dwc/allData)

Of course, my has a / in it, so the Hadoop client blows up even when they are escaped as %2f... in the end I got the hadoop code into eclipse and hacked it directly.

Uploading to S3 using the hadoop FSShell take a LONG time from my house! 3 minutes per 32meg = 186kb/s
Now that it is on S3, I can run many index generations without this latency, but each time there is a version, it would need updated. Clearly it will be better to harvest straight to S3 (perhaps in pages?).

1.5 hours (of hoping the network stays alive) later... (make that 3.5 hours total waiting time, but it is there for next time)

Now I fire up a master instance (standard Hadoop 0.17.0 AMI), connect to it, and copy across the input data using:
cd /usr/local/hadoop-0.17.0
bin/hadoop fs -mkdir logs
bin/hadoop distcp s3://:@/path/to/logs logs


The dreaded / problem again!!!
This time no way to easily hack around it, so top tip: keep reproducing your secret key until it has no /_- characters in it!!!
Now the copy works fine.

Copy up my jar file to the master (run locally)
. bin/hadoop-ec2-env.sh
scp $SSH_OPTS /tmp/speciesByCell.jar root@$MASTER_HOST_DNS:


So, as always we need a benchmark from single node.
Macbook Pro 2.4G, 2G JVM Memory:

org.apache.hadoop.mapred.Counters Counters: 11
org.apache.hadoop.mapred.Counters Map-Reduce Framework
org.apache.hadoop.mapred.Counters Map input records=134475707
org.apache.hadoop.mapred.Counters Map output records=106297454
org.apache.hadoop.mapred.Counters Map input bytes=14209537679
org.apache.hadoop.mapred.Counters Map output bytes=2406590553
org.apache.hadoop.mapred.Counters Combine input records=106297454
org.apache.hadoop.mapred.Counters Combine output records=1130567
org.apache.hadoop.mapred.Counters Reduce input groups=41845
org.apache.hadoop.mapred.Counters Reduce input records=9565669
org.apache.hadoop.mapred.Counters Reduce output records=7259291
org.apache.hadoop.mapred.Counters File Systems
org.apache.hadoop.mapred.Counters Local bytes read=3038621493011
org.apache.hadoop.mapred.Counters Local bytes written=73325771174

Finished in 2449 secs!

And 20 Node cluster:

08/06/29 17:31:04 INFO mapred.JobClient: Counters: 17
08/06/29 17:31:04 INFO mapred.JobClient: File Systems
08/06/29 17:31:04 INFO mapred.JobClient: Local bytes read=298092555
08/06/29 17:31:04 INFO mapred.JobClient: Local bytes written=597267841
08/06/29 17:31:04 INFO mapred.JobClient: HDFS bytes read=14210393954
08/06/29 17:31:04 INFO mapred.JobClient: HDFS bytes written=53736532
08/06/29 17:31:04 INFO mapred.JobClient: Job Counters
08/06/29 17:31:04 INFO mapred.JobClient: Launched map tasks=231
08/06/29 17:31:04 INFO mapred.JobClient: Launched reduce tasks=1
08/06/29 17:31:04 INFO mapred.JobClient: Data-local map tasks=207
08/06/29 17:31:04 INFO mapred.JobClient: Rack-local map tasks=5
08/06/29 17:31:04 INFO mapred.JobClient: Map-Reduce Framework
08/06/29 17:31:04 INFO mapred.JobClient: Map input records=134475707
08/06/29 17:31:04 INFO mapred.JobClient: Map output records=106297454
08/06/29 17:31:04 INFO mapred.JobClient: Map input bytes=14209537679
08/06/29 17:31:04 INFO mapred.JobClient: Map output bytes=2406590553
08/06/29 17:31:04 INFO mapred.JobClient: Combine input records=104849073
08/06/29 17:31:04 INFO mapred.JobClient: Combine output records=780651
08/06/29 17:31:04 INFO mapred.JobClient: Reduce input groups=41845
08/06/29 17:31:04 INFO mapred.JobClient: Reduce input records=9416965
08/06/29 17:31:04 INFO mapred.JobClient: Reduce output records=7259291

Finished in 472 secs!


None of these were tuned in anyway (Mappers, Reducers, Block Size etc)

Cost of running: well, less than 1 hour so ~$2 (20 x $0.1 + a little for transfers)

(Thanks to Tom White for publishing guidelines on Hadoop on EC2 here on which I based this test.)




2 comments:

Anonymous said...

hello,
nice post!

but why didn't you used S3 as a replacement for HDFS instead of copying files between hdfs and S3?

Tim Robertson said...

Thanks...
Firstly, this was my very first experience of S3, EC2 and even Hadoop on a decent sized cluster - so I just went with what was said to work. Secondly, I make the assumption that it is faster for Hadoop to run with the data on a local file system, as opposed to streaming from S3 for each chunk. I will benchmark both ways soon, and report here. I want to set up something that generates a lot more views onto the data, and then run a decent sized test - whenever I start putting together more complicated MR workflows it gets messy quickly - so I am about to evaluate cascading and see how that works out.
(Since running this test I have learnt that most of my time was running a single reducer at the end... I think the reducer should have run in a much more parallel fashion and it would have finished much quicker)