Skip to content

Commit f614743

Browse files
Fixed wordcount receiver
1 parent 286cbcc commit f614743

File tree

2 files changed

+19
-8
lines changed

2 files changed

+19
-8
lines changed

twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/wordcount/comms/WordAggregator.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
import java.util.logging.Level;
1717
import java.util.logging.Logger;
1818

19-
import org.apache.commons.lang3.tuple.ImmutablePair;
20-
2119
import edu.iu.dsc.tws.api.comms.BulkReceiver;
20+
import edu.iu.dsc.tws.api.comms.structs.Tuple;
2221
import edu.iu.dsc.tws.api.config.Config;
2322

2423
public class WordAggregator implements BulkReceiver {
@@ -35,10 +34,10 @@ public void init(Config cfg, Set<Integer> expectedIds) {
3534
public boolean receive(int target, Iterator<Object> it) {
3635
while (it.hasNext()) {
3736
Object next = it.next();
38-
if (next instanceof ImmutablePair) {
39-
ImmutablePair kc = (ImmutablePair) next;
37+
if (next instanceof Tuple) {
38+
Tuple kc = (Tuple) next;
4039
LOG.log(Level.INFO, String.format("%d Word %s count %s",
41-
target, kc.getKey(), ((int[]) kc.getValue())[0]));
40+
target, kc.getKey(), kc.getValue()));
4241
}
4342
}
4443
isDone = true;

twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/wordcount/comms/WordCountWorker.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import java.util.ArrayList;
2727
import java.util.HashSet;
2828
import java.util.List;
29+
import java.util.Map;
2930
import java.util.Set;
3031
import java.util.logging.Logger;
3132

33+
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
3234
import edu.iu.dsc.tws.api.comms.LogicalPlan;
33-
import edu.iu.dsc.tws.api.comms.Op;
35+
import edu.iu.dsc.tws.api.comms.ReduceFunction;
3436
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
3537
import edu.iu.dsc.tws.api.config.Config;
3638
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
@@ -39,7 +41,6 @@
3941
import edu.iu.dsc.tws.api.resource.IWorkerController;
4042
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
4143
import edu.iu.dsc.tws.comms.batch.BKeyedReduce;
42-
import edu.iu.dsc.tws.comms.functions.reduction.ReduceOperationFunction;
4344
import edu.iu.dsc.tws.comms.selectors.HashingSelector;
4445
import edu.iu.dsc.tws.examples.Utils;
4546

@@ -83,7 +84,18 @@ public void execute(Config cfg, int workerID,
8384
// create the communication
8485
wordAggregator = new WordAggregator();
8586
keyGather = new BKeyedReduce(workerEnv.getCommunicator(), logicalPlan, sources, destinations,
86-
new ReduceOperationFunction(Op.SUM, MessageTypes.INTEGER),
87+
new ReduceFunction() {
88+
@Override
89+
public void init(Config cfg, DataFlowOperation op, Map<Integer,
90+
List<Integer>> expectedIds) {
91+
92+
}
93+
94+
@Override
95+
public Object reduce(Object t1, Object t2) {
96+
return (Integer) t1 + (Integer) t2;
97+
}
98+
},
8799
wordAggregator, MessageTypes.OBJECT, MessageTypes.INTEGER, new HashingSelector());
88100
// assign the task ids to the workers, and run them using threads
89101
scheduleTasks();

0 commit comments

Comments
 (0)