Skip to content

Commit 1705665

Browse files
committed
MNEMONIC-218: Make inputSession & outputSession to accept the Configration parameter;
MNEMONIC-219: Remove the dependency on mapred FileOutputFormat.getUniqueFile
1 parent c165b1d commit 1705665

File tree

2 files changed

+67
-6
lines changed

2 files changed

+67
-6
lines changed

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class MneDurableInputSession<V>
3535
implements MneInputSession<V>, MneDurableComputable<NonVolatileMemAllocator> {
3636

3737
private TaskAttemptContext taskAttemptContext;
38+
private Configuration configuration;
3839
private String serviceName;
3940
private DurableType[] durableTypes;
4041
private EntityFactoryProxy[] entityFactoryProxies;
@@ -46,6 +47,11 @@ public class MneDurableInputSession<V>
4647

4748
public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
4849
setTaskAttemptContext(taskAttemptContext);
50+
setConfiguration(taskAttemptContext.getConfiguration());
51+
}
52+
53+
public MneDurableInputSession(Configuration configuration) {
54+
setConfiguration(configuration);
4955
}
5056

5157
public void validateConfig() {
@@ -61,10 +67,10 @@ && getEntityFactoryProxies().length < 1) { /* T.B.D. BUFFER & CHUNK */
6167

6268
@Override
6369
public void readConfig(String prefix) {
64-
if (getTaskAttemptContext() == null) {
65-
throw new ConfigurationException("taskAttemptContext has not yet been set");
70+
if (getConfiguration() == null) {
71+
throw new ConfigurationException("configuration has not yet been set");
6672
}
67-
Configuration conf = getTaskAttemptContext().getConfiguration();
73+
Configuration conf = getConfiguration();
6874
setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
6975
setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
7076
setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -144,4 +150,12 @@ public NonVolatileMemAllocator getAllocator() {
144150
public long getHandler() {
145151
return m_handler;
146152
}
153+
154+
public Configuration getConfiguration() {
155+
return configuration;
156+
}
157+
158+
public void setConfiguration(Configuration configuration) {
159+
this.configuration = configuration;
160+
}
147161
}

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,25 @@
2828
import org.apache.mnemonic.collections.DurableSinglyLinkedList;
2929
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
3030

31+
import java.text.NumberFormat;
3132
import java.util.HashMap;
3233
import java.util.Iterator;
3334
import java.util.Map;
3435

3536
import org.apache.commons.lang3.tuple.Pair;
3637
import org.apache.hadoop.conf.Configuration;
3738
import org.apache.hadoop.fs.Path;
39+
import org.apache.hadoop.mapreduce.JobContext;
3840
import org.apache.hadoop.mapreduce.TaskAttemptContext;
41+
import org.apache.hadoop.mapreduce.TaskID;
3942
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
4043

4144
public class MneDurableOutputSession<V>
4245
implements MneOutputSession<V>, MneDurableComputable<NonVolatileMemAllocator> {
4346

4447
private long poolSize;
4548
private TaskAttemptContext taskAttemptContext;
49+
private Configuration configuration;
4650
private String serviceName;
4751
private DurableType[] durableTypes;
4852
private EntityFactoryProxy[] entityFactoryProxies;
@@ -61,6 +65,11 @@ public class MneDurableOutputSession<V>
6165
public MneDurableOutputSession(TaskAttemptContext taskAttemptContext) {
6266
setTaskAttemptContext(taskAttemptContext);
6367
m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
68+
setConfiguration(taskAttemptContext.getConfiguration());
69+
}
70+
71+
public MneDurableOutputSession(Configuration configuration) {
72+
setConfiguration(configuration);
6473
}
6574

6675
public void validateConfig() {
@@ -79,7 +88,7 @@ public void readConfig(String prefix) {
7988
if (getTaskAttemptContext() == null) {
8089
throw new ConfigurationException("taskAttemptContext has not yet been set");
8190
}
82-
Configuration conf = getTaskAttemptContext().getConfiguration();
91+
Configuration conf = getConfiguration();
8392
setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
8493
setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
8594
setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -93,10 +102,40 @@ public void readConfig(String prefix) {
93102

94103
protected Path genNextPoolPath() {
95104
Path ret = new Path(FileOutputFormat.getOutputPath(getTaskAttemptContext()),
96-
FileOutputFormat.getUniqueFile(getTaskAttemptContext(),
97-
String.format("%s-%05d", getBaseOutputName(), ++m_poolidx), MneConfigHelper.DEFAULT_FILE_EXTENSION));
105+
getUniqueName(String.format("%s-%05d", getBaseOutputName(), ++m_poolidx),
106+
MneConfigHelper.DEFAULT_FILE_EXTENSION));
98107
return ret;
99108
}
109+
110+
protected String getUniqueName(String name, String extension) {
111+
int partition;
112+
113+
NumberFormat numberFormat = NumberFormat.getInstance();
114+
numberFormat.setMinimumIntegerDigits(5);
115+
numberFormat.setGroupingUsed(false);
116+
117+
if (null != getTaskAttemptContext()) {
118+
TaskID taskId = getTaskAttemptContext().getTaskAttemptID().getTaskID();
119+
partition = taskId.getId();
120+
} else {
121+
partition = getConfiguration().getInt(JobContext.TASK_PARTITION, -1);
122+
}
123+
if (partition == -1) {
124+
throw new IllegalArgumentException("This method can only be called from an application");
125+
}
126+
127+
String taskType = getConfiguration().getBoolean(JobContext.TASK_ISMAP, JobContext.DEFAULT_TASK_ISMAP) ? "m" : "r";
128+
129+
StringBuilder result = new StringBuilder();
130+
result.append(name);
131+
result.append('-');
132+
result.append(taskType);
133+
result.append('-');
134+
result.append(numberFormat.format(partition));
135+
result.append(extension);
136+
return result.toString();
137+
138+
}
100139

101140
@Override
102141
public void initNextPool() {
@@ -326,4 +365,12 @@ public void setBaseOutputName(String baseOutputName) {
326365
this.baseOutputName = baseOutputName;
327366
}
328367

368+
public Configuration getConfiguration() {
369+
return configuration;
370+
}
371+
372+
public void setConfiguration(Configuration configuration) {
373+
this.configuration = configuration;
374+
}
375+
329376
}

0 commit comments

Comments
 (0)