public class MapReduceTask<KIn,VIn,KOut,VOut>
extends java.lang.Object
Users should instantiate MapReduceTask with a reference to a cache whose data is used as input for this
task. Infinispan execution environment will migrate and execute instances of provided Mapper
and Reducer
seamlessly across Infinispan nodes.
Unless otherwise specified using onKeys(Object...)
filter all available
key/value pairs of a specified cache will be used as input data for this task.
For example, MapReduceTask that counts number of word occurrences in a particular cache where
keys and values are String instances could be written as follows:
MapReduceTask<String, String, String, Integer> task = new MapReduceTask<String, String, String, Integer>(cache); task.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer()); Map<String, Integer> results = task.execute();The final result is a map where key is a word and value is a word count for that particular word.
Accompanying Mapper
and Reducer
are defined as follows:
private static class WordCountMapper implements Mapper<String, String, String,Integer> { public void map(String key, String value, Collector<String, Integer> collector) { StringTokenizer tokens = new StringTokenizer(value); while (tokens.hasMoreElements()) { String s = (String) tokens.nextElement(); collector.emit(s, 1); } } } private static class WordCountReducer implements Reducer<String, Integer> { public Integer reduce(String key, Iterator<Integer> iter) { int sum = 0; while (iter.hasNext()) { Integer i = (Integer) iter.next(); sum += i; } return sum; } }
Finally, as of Infinispan 5.2 release, MapReduceTask can also specify a Combiner function. The Combiner is executed on each node after the Mapper and before the global reduce phase. The Combiner receives input from the Mapper's output and the output from the Combiner is then sent to the reducers. It is useful to think of the Combiner as a node local reduce phase before global reduce phase is executed.
Combiners are especially useful when reduce function is both commutative and associative! In such cases we can use the Reducer itself as the Combiner; all one needs to do is to specify the Combiner:
MapReduceTask<String, String, String, Integer> task = new MapReduceTask<String, String, String, Integer>(cache); task.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer()).combineWith(new WordCountReducer()); Map<String, Integer> results = task.execute();Note that
Mapper
and Reducer
should not be specified as inner classes. Inner classes
declared in non-static contexts contain implicit non-transient references to enclosing class instances,
serializing such an inner class instance will result in serialization of its associated outer class instance as well.
If you are not familiar with concept of map reduce distributed execution model start with Google's MapReduce research paper.
Modifier and Type | Class and Description |
---|---|
private static interface |
MapReduceTask.CancellableTaskPart |
private class |
MapReduceTask.MapReduceTaskFuture<R> |
private class |
MapReduceTask.MapTaskPart<V> |
private class |
MapReduceTask.ReduceTaskPart<V> |
private class |
MapReduceTask.TaskPart<V> |
Modifier and Type | Field and Description |
---|---|
protected AdvancedCache<KIn,VIn> |
cache |
protected java.util.List<MapReduceTask.CancellableTaskPart> |
cancellableTasks |
protected org.infinispan.commands.CancellationService |
cancellationService |
protected org.infinispan.interceptors.locking.ClusteringDependentLogic |
clusteringDependentLogic |
protected Reducer<KOut,VOut> |
combiner |
protected java.lang.String |
customIntermediateCacheName |
static java.lang.String |
DEFAULT_TMP_CACHE_CONFIGURATION_NAME |
protected boolean |
distributeReducePhase |
protected java.lang.String |
intermediateCacheConfigurationName |
protected boolean |
isLocalOnly |
protected java.util.Collection<KIn> |
keys |
private static org.infinispan.util.logging.Log |
log |
protected Mapper<KIn,VIn,KOut,VOut> |
mapper |
protected MapReduceManager |
mapReduceManager |
protected Marshaller |
marshaller |
private int |
maxCollectorSize |
protected Reducer<KOut,VOut> |
reducer |
protected org.infinispan.remoting.rpc.RpcOptionsBuilder |
rpcOptionsBuilder |
protected java.util.UUID |
taskId |
protected boolean |
useIntermediateSharedCache |
Constructor and Description |
---|
MapReduceTask(Cache<KIn,VIn> masterCacheNode)
Create a new MapReduceTask given a master cache node.
|
MapReduceTask(Cache<KIn,VIn> masterCacheNode,
boolean distributeReducePhase)
Create a new MapReduceTask given a master cache node.
|
MapReduceTask(Cache<KIn,VIn> masterCacheNode,
boolean distributeReducePhase,
boolean useIntermediateSharedCache)
Create a new MapReduceTask given a master cache node.
|
Modifier and Type | Method and Description |
---|---|
protected void |
aggregateReducedResult(java.util.Map<KOut,java.util.List<VOut>> finalReduced,
java.util.Map<KOut,VOut> mapReceived) |
private org.infinispan.commands.CancelCommand |
buildCancelCommand(MapReduceTask.CancellableTaskPart taskPart) |
private org.infinispan.commands.read.MapCombineCommand<KIn,VIn,KOut,VOut> |
buildMapCombineCommand(java.lang.String taskId,
Mapper<KIn,VIn,KOut,VOut> m,
Reducer<KOut,VOut> r,
java.lang.String intermediateCacheName,
java.util.Collection<KIn> keys,
boolean reducePhaseDistributed,
boolean emitCompositeIntermediateKeys) |
private org.infinispan.commands.read.ReduceCommand<KOut,VOut> |
buildReduceCommand(java.lang.String resultCacheName,
java.lang.String taskId,
java.lang.String destinationCache,
Reducer<KOut,VOut> r,
java.util.Collection<KOut> keys,
boolean emitCompositeIntermediateKeys) |
protected Mapper<KIn,VIn,KOut,VOut> |
clone(Mapper<KIn,VIn,KOut,VOut> mapper) |
protected Reducer<KOut,VOut> |
clone(Reducer<KOut,VOut> reducer) |
MapReduceTask<KIn,VIn,KOut,VOut> |
combinedWith(Reducer<KOut,VOut> combiner)
Specifies Combiner to use for this MapReduceTask
|
protected <V> MapReduceTask.ReduceTaskPart<V> |
createReducePart(org.infinispan.commands.read.ReduceCommand<KOut,VOut> cmd,
org.infinispan.remoting.transport.Address target,
java.lang.String destCacheName) |
protected <V> MapReduceTask.MapTaskPart<V> |
createTaskMapPart(org.infinispan.commands.read.MapCombineCommand<KIn,VIn,KOut,VOut> cmd,
org.infinispan.remoting.transport.Address target,
boolean distributedReduce) |
protected boolean |
distributeReducePhase() |
private void |
ensureAccessPermissions(AdvancedCache<?,?> cache) |
private void |
ensureProperCacheState(AdvancedCache<KIn,VIn> cache) |
boolean |
equals(java.lang.Object obj) |
java.util.Map<KOut,VOut> |
execute()
Executes this task across Infinispan cluster nodes.
|
void |
execute(Cache<KOut,VOut> resultsCache)
Executes this task and stores results in the provided results cache.
|
<R> R |
execute(Collator<KOut,VOut,R> collator)
Executes this task across Infinispan cluster but the final result is collated using specified
Collator |
void |
execute(java.lang.String resultsCache)
Executes this task and stores results in the provided results cache.
|
java.util.concurrent.Future<java.util.Map<KOut,VOut>> |
executeAsynchronously()
Executes this task across Infinispan cluster nodes asynchronously.
|
<R> java.util.concurrent.Future<R> |
executeAsynchronously(Collator<KOut,VOut,R> collator)
Executes this task asynchronously across Infinispan cluster; final result is collated using
specified
Collator and wrapped by Future |
protected java.util.Map<KOut,VOut> |
executeHelper(java.lang.String resultCache) |
protected java.util.Set<KOut> |
executeMapPhase(boolean useCompositeKeys) |
protected void |
executeMapPhaseWithLocalReduction(java.util.Map<KOut,VOut> reducedResult) |
protected java.util.Map<KOut,VOut> |
executeReducePhase(java.lang.String resultCache,
java.util.Set<KOut> allMapPhasesResponses,
boolean useCompositeKeys) |
protected void |
executeTaskInit(java.lang.String tmpCacheName) |
protected java.lang.String |
getIntermediateCacheName() |
int |
hashCode() |
protected boolean |
inputTaskKeysEmpty() |
protected <T> java.util.Map<org.infinispan.remoting.transport.Address,? extends java.util.Collection<T>> |
mapKeysToNodes(java.util.Collection<T> keysToMap) |
protected <T> java.util.Map<org.infinispan.remoting.transport.Address,? extends java.util.Collection<T>> |
mapKeysToNodes(java.util.Collection<T> keysToMap,
boolean useIntermediateCompositeKey) |
protected <T> java.util.Map<org.infinispan.remoting.transport.Address,? extends java.util.Collection<T>> |
mapKeysToNodes(org.infinispan.distribution.DistributionManager dm,
java.util.Collection<T> keysToMap,
boolean useIntermediateCompositeKey) |
MapReduceTask<KIn,VIn,KOut,VOut> |
mappedWith(Mapper<KIn,VIn,KOut,VOut> mapper)
Specifies Mapper to use for this MapReduceTask
|
private <K,V> void |
mergeResponse(java.util.Map<K,java.util.List<V>> result,
java.util.Map<K,java.util.List<V>> m) |
MapReduceTask<KIn,VIn,KOut,VOut> |
onKeys(KIn... input)
Rather than use all available keys as input
onKeys allows users to specify a
subset of keys as input to this task |
MapReduceTask<KIn,VIn,KOut,VOut> |
reducedWith(Reducer<KOut,VOut> reducer)
Specifies Reducer to use for this MapReduceTask
|
void |
setMaxCollectorSize(int size)
Limits Mapper's Collector
|
MapReduceTask<KIn,VIn,KOut,VOut> |
timeout(long timeout,
java.util.concurrent.TimeUnit unit)
See
timeout(TimeUnit) . |
long |
timeout(java.util.concurrent.TimeUnit outputTimeUnit) |
java.lang.String |
toString() |
protected boolean |
useIntermediatePerTaskCache() |
protected boolean |
useIntermediateSharedCache() |
MapReduceTask<KIn,VIn,KOut,VOut> |
usingIntermediateCache(java.lang.String cacheConfigurationName)
Allows this MapReduceTask to use specific intermediate custom defined cache for storage of
intermediate
|
MapReduceTask<KIn,VIn,KOut,VOut> |
usingSharedIntermediateCache(java.lang.String cacheName)
Allows this MapReduceTask to use a specific shared intermediate cache for storage of
intermediate
|
MapReduceTask<KIn,VIn,KOut,VOut> |
usingSharedIntermediateCache(java.lang.String cacheName,
java.lang.String cacheConfigurationName)
Allows this MapReduceTask to use a specific shared intermediate cache for storage of
intermediate
|
private static final org.infinispan.util.logging.Log log
public static final java.lang.String DEFAULT_TMP_CACHE_CONFIGURATION_NAME
protected final boolean distributeReducePhase
protected boolean useIntermediateSharedCache
protected final java.util.Collection<KIn> keys
protected final AdvancedCache<KIn,VIn> cache
protected final Marshaller marshaller
protected final MapReduceManager mapReduceManager
protected final org.infinispan.commands.CancellationService cancellationService
protected final java.util.List<MapReduceTask.CancellableTaskPart> cancellableTasks
protected final java.util.UUID taskId
protected final org.infinispan.interceptors.locking.ClusteringDependentLogic clusteringDependentLogic
protected final boolean isLocalOnly
protected org.infinispan.remoting.rpc.RpcOptionsBuilder rpcOptionsBuilder
protected java.lang.String customIntermediateCacheName
protected java.lang.String intermediateCacheConfigurationName
private int maxCollectorSize
public MapReduceTask(Cache<KIn,VIn> masterCacheNode)
Large and data intensive tasks whose reduction phase would exceed working memory of one Infinispan node should use distributed reduce phase
masterCacheNode
- cache node initiating map reduce taskpublic MapReduceTask(Cache<KIn,VIn> masterCacheNode, boolean distributeReducePhase)
masterCacheNode
- cache node initiating map reduce taskdistributeReducePhase
- if true this task will use distributed reduce phase executionpublic MapReduceTask(Cache<KIn,VIn> masterCacheNode, boolean distributeReducePhase, boolean useIntermediateSharedCache)
masterCacheNode
- cache node initiating map reduce taskdistributeReducePhase
- if true this task will use distributed reduce phase executionuseIntermediateSharedCache
- if true this tasks will share intermediate value cache with other executing
MapReduceTasks on the grid. Otherwise, if false, this task will use its own
dedicated cache for intermediate valuespublic MapReduceTask<KIn,VIn,KOut,VOut> onKeys(KIn... input)
onKeys
allows users to specify a
subset of keys as input to this taskinput
- input keys for this taskpublic MapReduceTask<KIn,VIn,KOut,VOut> mappedWith(Mapper<KIn,VIn,KOut,VOut> mapper)
Note that Mapper
should not be specified as inner class. Inner classes declared in
non-static contexts contain implicit non-transient references to enclosing class instances,
serializing such an inner class instance will result in serialization of its associated outer
class instance as well.
mapper
- used to execute map phase of MapReduceTaskpublic MapReduceTask<KIn,VIn,KOut,VOut> reducedWith(Reducer<KOut,VOut> reducer)
Note that Reducer
should not be specified as inner class. Inner classes declared in
non-static contexts contain implicit non-transient references to enclosing class instances,
serializing such an inner class instance will result in serialization of its associated outer
class instance as well.
reducer
- used to reduce results of map phasepublic MapReduceTask<KIn,VIn,KOut,VOut> combinedWith(Reducer<KOut,VOut> combiner)
Note that Reducer
should not be specified as inner class. Inner classes declared in
non-static contexts contain implicit non-transient references to enclosing class instances,
serializing such an inner class instance will result in serialization of its associated outer
class instance as well.
combiner
- used to immediately combine results of map phase before reduce phase is invokedpublic final MapReduceTask<KIn,VIn,KOut,VOut> timeout(long timeout, java.util.concurrent.TimeUnit unit)
timeout(TimeUnit)
.
Note: the timeout value will be converted to milliseconds and a value less or equal than zero means wait forever.timeout
- unit
- public final long timeout(java.util.concurrent.TimeUnit outputTimeUnit)
TimeUnit
to wait for the remote map/reduce task to finish. The default value
given by SyncConfiguration.replTimeout()
public MapReduceTask<KIn,VIn,KOut,VOut> usingIntermediateCache(java.lang.String cacheConfigurationName)
cacheConfiguration
- name of the cache configuration to use for the intermediate cachepublic MapReduceTask<KIn,VIn,KOut,VOut> usingSharedIntermediateCache(java.lang.String cacheName)
cacheName
- name of the custom cachepublic MapReduceTask<KIn,VIn,KOut,VOut> usingSharedIntermediateCache(java.lang.String cacheName, java.lang.String cacheConfigurationName)
Rather than using MapReduceTask default configuration for intermediate cache this method allows clients to specify custom shared cache configuration.
cacheName
- name of the custom cachecacheConfiguration
- name of the cache configuration to use for the intermediate cachepublic void setMaxCollectorSize(int size)
During execution of map/combine phase, number of intermediate keys/values collected in Collector could potentially become very large. By limiting size of collector, intermediate key/values are transferred to intermediate cache in batches before reduce phase is executed and OutOfMemoryError issues are avoided as well.
The default value for max collector size is 10000.
size
- the number of key/value pairs for one batch transferMapper.map(Object, Object, Collector)
public java.util.Map<KOut,VOut> execute() throws CacheException
CacheException
public void execute(Cache<KOut,VOut> resultsCache) throws CacheException
resultsCache
- application provided results cacheCacheException
public void execute(java.lang.String resultsCache) throws CacheException
resultsCache
- application provided results cache represented by its nameCacheException
protected java.util.Map<KOut,VOut> executeHelper(java.lang.String resultCache) throws java.lang.NullPointerException, CacheException
java.lang.NullPointerException
CacheException
protected java.lang.String getIntermediateCacheName()
protected boolean distributeReducePhase()
protected boolean useIntermediateSharedCache()
protected boolean useIntermediatePerTaskCache()
protected void executeTaskInit(java.lang.String tmpCacheName) throws java.lang.Exception
java.lang.Exception
protected java.util.Set<KOut> executeMapPhase(boolean useCompositeKeys) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException
java.lang.InterruptedException
java.util.concurrent.ExecutionException
protected void executeMapPhaseWithLocalReduction(java.util.Map<KOut,VOut> reducedResult) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException
java.lang.InterruptedException
java.util.concurrent.ExecutionException
protected <V> MapReduceTask.MapTaskPart<V> createTaskMapPart(org.infinispan.commands.read.MapCombineCommand<KIn,VIn,KOut,VOut> cmd, org.infinispan.remoting.transport.Address target, boolean distributedReduce)
protected java.util.Map<KOut,VOut> executeReducePhase(java.lang.String resultCache, java.util.Set<KOut> allMapPhasesResponses, boolean useCompositeKeys) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException
java.lang.InterruptedException
java.util.concurrent.ExecutionException
protected <V> MapReduceTask.ReduceTaskPart<V> createReducePart(org.infinispan.commands.read.ReduceCommand<KOut,VOut> cmd, org.infinispan.remoting.transport.Address target, java.lang.String destCacheName)
private <K,V> void mergeResponse(java.util.Map<K,java.util.List<V>> result, java.util.Map<K,java.util.List<V>> m)
private org.infinispan.commands.read.MapCombineCommand<KIn,VIn,KOut,VOut> buildMapCombineCommand(java.lang.String taskId, Mapper<KIn,VIn,KOut,VOut> m, Reducer<KOut,VOut> r, java.lang.String intermediateCacheName, java.util.Collection<KIn> keys, boolean reducePhaseDistributed, boolean emitCompositeIntermediateKeys)
private org.infinispan.commands.read.ReduceCommand<KOut,VOut> buildReduceCommand(java.lang.String resultCacheName, java.lang.String taskId, java.lang.String destinationCache, Reducer<KOut,VOut> r, java.util.Collection<KOut> keys, boolean emitCompositeIntermediateKeys)
private org.infinispan.commands.CancelCommand buildCancelCommand(MapReduceTask.CancellableTaskPart taskPart)
public java.util.concurrent.Future<java.util.Map<KOut,VOut>> executeAsynchronously()
public <R> R execute(Collator<KOut,VOut,R> collator)
Collator
collator
- a Collator to usepublic <R> java.util.concurrent.Future<R> executeAsynchronously(Collator<KOut,VOut,R> collator)
Collator
and wrapped by Futurecollator
- a Collator to useprotected void aggregateReducedResult(java.util.Map<KOut,java.util.List<VOut>> finalReduced, java.util.Map<KOut,VOut> mapReceived)
protected <T> java.util.Map<org.infinispan.remoting.transport.Address,? extends java.util.Collection<T>> mapKeysToNodes(org.infinispan.distribution.DistributionManager dm, java.util.Collection<T> keysToMap, boolean useIntermediateCompositeKey)
protected <T> java.util.Map<org.infinispan.remoting.transport.Address,? extends java.util.Collection<T>> mapKeysToNodes(java.util.Collection<T> keysToMap, boolean useIntermediateCompositeKey)
protected <T> java.util.Map<org.infinispan.remoting.transport.Address,? extends java.util.Collection<T>> mapKeysToNodes(java.util.Collection<T> keysToMap)
private void ensureAccessPermissions(AdvancedCache<?,?> cache)
private void ensureProperCacheState(AdvancedCache<KIn,VIn> cache) throws java.lang.NullPointerException, java.lang.IllegalStateException
java.lang.NullPointerException
java.lang.IllegalStateException
protected boolean inputTaskKeysEmpty()
public int hashCode()
hashCode
in class java.lang.Object
public boolean equals(java.lang.Object obj)
equals
in class java.lang.Object
public java.lang.String toString()
toString
in class java.lang.Object