Accumulo and Pig

March 02, 2012

I just posted a functional AccumuloStorage module to github.

Here’s how you use it (also in the github README)

###1. Build the JAR

Note: you will need to download the accumulo src, build it, and install it into your maven repo before this will work

mvn package

This will create a JAR file here:

1
target/accumulo-pig-1.5.0-incubating-SNAPSHOT.jar

###2. Download the JARs needed by pig

mvn dependency:copy-dependencies -DoutputDirectory=lib  \
    -DincludeArtifactIds=zookeeper,libthrift,accumulo-core,cloudtrace

This should have copied the needed dependency jars into a

1
lib
directory.

###3. Print the register statements we will need in pig

for JAR in lib/*.jar target/accumulo-pig-1.5.0-incubating-SNAPSHOT.jar ; 
do 
    echo register `pwd`/$JAR; 
done

Here is some example output

register /home/developer/workspace/accumulo-pig/lib/accumulo-core-1.5.0-incubating-SNAPSHOT.jar
register /home/developer/workspace/accumulo-pig/lib/cloudtrace-1.5.0-incubating-SNAPSHOT.jar
register /home/developer/workspace/accumulo-pig/lib/libthrift-0.6.1.jar
register /home/developer/workspace/accumulo-pig/lib/zookeeper-3.3.1.jar
register /home/developer/workspace/accumulo-pig/target/accumulo-pig-1.5.0-incubating-SNAPSHOT.jar

####5. Run Pig

Copy the register statements above and paste them into the pig terminal. Then you can LOAD from and STORE into accumulo.

$ pig
2012-03-02 08:15:25,808 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/developer/workspace/accumulo-pig/pig_1330694125807.log
2012-03-02 08:15:25,937 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://127.0.0.1/
2012-03-02 08:15:26,032 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: 127.0.0.1:9001
grunt> register /home/developer/workspace/accumulo-pig/lib/accumulo-core-1.5.0-incubating-SNAPSHOT.jar
grunt> register /home/developer/workspace/accumulo-pig/lib/cloudtrace-1.5.0-incubating-SNAPSHOT.jar
grunt> register /home/developer/workspace/accumulo-pig/lib/libthrift-0.6.1.jar
grunt> register /home/developer/workspace/accumulo-pig/lib/zookeeper-3.3.1.jar
grunt> register /home/developer/workspace/accumulo-pig/target/accumulo-pig-1.5.0-incubating-SNAPSHOT.jar
grunt> 
grunt> DATA = LOAD 'accumulo://webpage?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181&columns=f:cnt' 
>>using org.apache.accumulo.pig.AccumuloStorage() AS (row, cf, cq, cv, ts, val);
grunt> 
grunt> DATA2 = FOREACH DATA GENERATE row, cf, cq, cv, val;
grunt> 
grunt> STORE DATA2 into 'accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181' using org.apache.accumulo.pig.AccumuloStorage();
2012-03-02 08:18:44,090 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN
2012-03-02 08:18:44,093 [main] INFO  org.apache.pig.newplan.logical.rules.ColumnPruneVisitor - Columns pruned for DATA: $4
2012-03-02 08:18:44,108 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2012-03-02 08:18:44,110 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2012-03-02 08:18:44,110 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2012-03-02 08:18:44,117 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
2012-03-02 08:18:44,118 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2012-03-02 08:18:44,120 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job7611629033341757288.jar
2012-03-02 08:18:46,282 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job7611629033341757288.jar created
2012-03-02 08:18:46,286 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2012-03-02 08:18:46,375 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2012-03-02 08:18:46,876 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2012-03-02 08:18:46,878 [Thread-17] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2012-03-02 08:18:47,887 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201203020643_0001
2012-03-02 08:18:47,887 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://127.0.0.1:50030/jobdetails.jsp?jobid=job_201203020643_0001
2012-03-02 08:18:54,434 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete
2012-03-02 08:18:57,484 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2012-03-02 08:18:57,485 [main] INFO  org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics: 
 
HadoopVersionPigVersionUserIdStartedAtFinishedAtFeatures
0.20.20.9.2developer2012-03-02 08:18:442012-03-02 08:18:57UNKNOWN
 
Success!
 
Job Stats (time in seconds):
JobIdMapsReducesMaxMapTimeMinMapTImeAvgMapTimeMaxReduceTimeMinReduceTimeAvgReduceTimeAliasFeatureOutputs
job_201203020643_000110333000DATA,DATA2MAP_ONLYaccumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181,
 
Input(s):
Successfully read 288 records from: "accumulo://webpage?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181&columns=f:cnt"
 
Output(s):
Successfully stored 288 records in: "accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181"
 
Counters:
Total records written : 288
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
 
Job DAG:
job_201203020643_0001
 
 
2012-03-02 08:18:57,492 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
grunt> 

Here are the pig commands run if you don’t want to look through the output above:

# load just the web content (from the f:cnt column) from the webpage table
DATA = LOAD 
'accumulo://webpage?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181&columns=f:cnt' 
   using org.apache.accumulo.pig.AccumuloStorage() AS (row, cf, cq, cv, ts, val);

# basically, remove the ts field since it is not needed
DATA2 = FOREACH DATA GENERATE row, cf, cq, cv, val;

# store the data as is in a new table called webpage_content
STORE DATA2 into 
'accumulo://webpage_content?instance=inst&user=root&password=secret&zookeepers=127.0.0.1:2181' 
   using org.apache.accumulo.pig.AccumuloStorage();

A more detailed blog post going in more detail of how/why this is useful will follow.

–Jason

Update (2012/03/04): you may want to run this as the first line of the pig script:

SET mapred.map.tasks.speculative.execution false

This will avoid ingesting duplicate entries into accumulo. For the data from this post, ingesting duplicate entries wouldn’t cause any real issues because Accumulo’s

1
VersioningIterator
would only keep the newest copy, but for columns/tables with aggregation configured (e.g. using
1
LongCombiner
) we definitely don’t want this.