Skip to content

Commit 224ef2f

Browse files
committed
MNEMONIC-216: Support Hadoop mapred APIs
MNEMONIC-220: Remove the dependency on OutputDir of FileOutputFormat
1 parent 1705665 commit 224ef2f

File tree

13 files changed

+472
-16
lines changed

13 files changed

+472
-16
lines changed

build-tools/test.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,6 @@ mvn -Dtest=MneMapreduceLongDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-map
5050
mvn -Dtest=MneMapreduceBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
5151

5252
mvn -Dtest=MneMapreduceChunkDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
53+
54+
# a testcase for module "mnemonic-hadoop/mnemonic-hadoop-mapreduce" that requires 'pmalloc' memory service to pass
55+
mvn -Dtest=MneMapredBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class MneConfigHelper {
4444
public static final String DEFAULT_NAME_PART = "part";
4545
public static final String DEFAULT_FILE_EXTENSION = ".mne";
4646
public static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
47+
public static final String DIR = "dir";
4748

4849
private static final Logger LOGGER = LoggerFactory.getLogger(MneConfigHelper.class);
4950

@@ -129,5 +130,13 @@ public static void setMemPoolSize(Configuration conf, String prefix, long size)
129130
public static long getMemPoolSize(Configuration conf, String prefix) {
130131
return conf.getLong(getConfigName(prefix, MEM_POOL_SIZE), DEFAULT_OUTPUT_MEM_POOL_SIZE);
131132
}
133+
134+
public static String getDir(Configuration conf, String prefix) {
135+
return conf.get(getConfigName(prefix, DIR));
136+
}
137+
138+
public static void setDir(Configuration conf, String prefix, String dirname) {
139+
conf.set(getConfigName(prefix, DIR), dirname);
140+
}
132141

133142
}

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public class MneDurableInputSession<V>
4646

4747

4848
public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
49+
this(taskAttemptContext.getConfiguration());
4950
setTaskAttemptContext(taskAttemptContext);
50-
setConfiguration(taskAttemptContext.getConfiguration());
5151
}
5252

5353
public MneDurableInputSession(Configuration configuration) {
@@ -67,10 +67,10 @@ && getEntityFactoryProxies().length < 1) { /* T.B.D. BUFFER & CHUNK */
6767

6868
@Override
6969
public void readConfig(String prefix) {
70-
if (getConfiguration() == null) {
71-
throw new ConfigurationException("configuration has not yet been set");
72-
}
7370
Configuration conf = getConfiguration();
71+
if (conf == null) {
72+
throw new ConfigurationException("Configuration has not yet been set");
73+
}
7474
setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
7575
setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
7676
setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.hadoop.mapreduce.JobContext;
4040
import org.apache.hadoop.mapreduce.TaskAttemptContext;
4141
import org.apache.hadoop.mapreduce.TaskID;
42-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
4342

4443
public class MneDurableOutputSession<V>
4544
implements MneOutputSession<V>, MneDurableComputable<NonVolatileMemAllocator> {
@@ -63,13 +62,13 @@ public class MneDurableOutputSession<V>
6362
protected Iterator<V> m_iter;
6463

6564
public MneDurableOutputSession(TaskAttemptContext taskAttemptContext) {
65+
this(taskAttemptContext.getConfiguration());
6666
setTaskAttemptContext(taskAttemptContext);
67-
m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
68-
setConfiguration(taskAttemptContext.getConfiguration());
6967
}
7068

7169
public MneDurableOutputSession(Configuration configuration) {
7270
setConfiguration(configuration);
71+
m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
7372
}
7473

7574
public void validateConfig() {
@@ -85,10 +84,10 @@ && getEntityFactoryProxies().length < 1) { /* T.B.D. BUFFER & CHUNK */
8584

8685
@Override
8786
public void readConfig(String prefix) {
88-
if (getTaskAttemptContext() == null) {
89-
throw new ConfigurationException("taskAttemptContext has not yet been set");
90-
}
9187
Configuration conf = getConfiguration();
88+
if (conf == null) {
89+
throw new ConfigurationException("Configuration has not yet been set");
90+
}
9291
setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
9392
setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
9493
setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
@@ -101,12 +100,17 @@ public void readConfig(String prefix) {
101100
}
102101

103102
protected Path genNextPoolPath() {
104-
Path ret = new Path(FileOutputFormat.getOutputPath(getTaskAttemptContext()),
103+
Path ret = new Path(getOutputDir(),
105104
getUniqueName(String.format("%s-%05d", getBaseOutputName(), ++m_poolidx),
106105
MneConfigHelper.DEFAULT_FILE_EXTENSION));
107106
return ret;
108107
}
109-
108+
109+
protected Path getOutputDir() {
110+
String name = MneConfigHelper.getDir(getConfiguration(), MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
111+
return name == null ? null : new Path(name);
112+
}
113+
110114
protected String getUniqueName(String name, String extension) {
111115
int partition;
112116

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.mnemonic.hadoop.mapred;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.io.NullWritable;
24+
import org.apache.hadoop.mapred.FileInputFormat;
25+
import org.apache.hadoop.mapred.FileSplit;
26+
import org.apache.hadoop.mapred.InputSplit;
27+
import org.apache.hadoop.mapred.JobConf;
28+
import org.apache.hadoop.mapred.RecordReader;
29+
import org.apache.hadoop.mapred.Reporter;
30+
import org.apache.mnemonic.hadoop.MneDurableInputValue;
31+
32+
/**
33+
* A Mnemonic input format that satisfies the org.apache.hadoop.mapred API.
34+
*/
35+
36+
public class MneInputFormat<MV extends MneDurableInputValue<V>, V>
37+
extends FileInputFormat<NullWritable, MV> {
38+
39+
@Override
40+
public RecordReader<NullWritable, MV>
41+
getRecordReader(InputSplit inputSpilt,
42+
JobConf jobConf,
43+
Reporter reporter
44+
) throws IOException {
45+
MneMapredRecordReader<MV, V> reader =
46+
new MneMapredRecordReader<MV, V>((FileSplit) inputSpilt, jobConf);
47+
return reader;
48+
}
49+
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.mnemonic.hadoop.mapred;
20+
21+
import java.io.IOException;
22+
import java.util.Iterator;
23+
24+
import org.apache.hadoop.io.NullWritable;
25+
import org.apache.hadoop.mapred.FileSplit;
26+
import org.apache.hadoop.mapred.JobConf;
27+
import org.apache.mnemonic.hadoop.MneConfigHelper;
28+
import org.apache.mnemonic.hadoop.MneDurableInputSession;
29+
import org.apache.mnemonic.hadoop.MneDurableInputValue;
30+
31+
/**
32+
* This record reader implements the org.apache.hadoop.mapred API.
33+
*
34+
* @param <V>
35+
* the type of the data item
36+
*/
37+
38+
public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
39+
implements org.apache.hadoop.mapred.RecordReader<NullWritable, MV> {
40+
41+
protected Iterator<V> m_iter;
42+
protected MneDurableInputSession<V> m_session;
43+
protected FileSplit m_fileSplit;
44+
45+
46+
public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
47+
m_fileSplit = fileSplit;
48+
m_session = new MneDurableInputSession<V>(conf);
49+
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
50+
m_session.initialize(m_fileSplit.getPath());
51+
m_iter = m_session.iterator();
52+
}
53+
54+
@Override
55+
public boolean next(NullWritable key, MV value) throws IOException {
56+
boolean ret = false;
57+
if (m_iter.hasNext()) {
58+
value.of(m_iter.next());
59+
ret = true;
60+
}
61+
return ret;
62+
}
63+
64+
@Override
65+
public NullWritable createKey() {
66+
return NullWritable.get();
67+
}
68+
69+
@SuppressWarnings("unchecked")
70+
@Override
71+
public MV createValue() {
72+
return (MV) new MneDurableInputValue<V>(m_session);
73+
}
74+
75+
@Override
76+
public long getPos() throws IOException {
77+
return m_fileSplit.getLength();
78+
}
79+
80+
@Override
81+
public void close() throws IOException {
82+
m_session.close();
83+
}
84+
85+
@Override
86+
public float getProgress() throws IOException {
87+
return 0.5f; /* TBD */
88+
}
89+
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.mnemonic.hadoop.mapred;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.io.NullWritable;
24+
import org.apache.hadoop.mapred.RecordWriter;
25+
import org.apache.hadoop.mapred.Reporter;
26+
import org.apache.mnemonic.hadoop.MneDurableOutputValue;
27+
28+
/**
29+
* This record writer implements the org.apache.hadoop.mapred API.
30+
*
31+
* @param <?>
32+
* the type of the data item
33+
*/
34+
35+
public class MneMapredRecordWriter<MV extends MneDurableOutputValue<?>>
36+
implements RecordWriter<NullWritable, MV> {
37+
38+
39+
@Override
40+
public void write(NullWritable key, MV value) throws IOException {
41+
value.post();
42+
}
43+
44+
@Override
45+
public void close(Reporter reporter) throws IOException {
46+
}
47+
}
48+
49+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.mnemonic.hadoop.mapred;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.io.NullWritable;
25+
import org.apache.hadoop.mapred.RecordWriter;
26+
import org.apache.hadoop.util.Progressable;
27+
import org.apache.hadoop.mapred.FileOutputFormat;
28+
import org.apache.hadoop.mapred.JobConf;
29+
import org.apache.mnemonic.hadoop.MneDurableOutputValue;
30+
31+
/**
32+
* A Mnemonic output format that satisfies the org.apache.hadoop.mapred API.
33+
*/
34+
35+
public class MneOutputFormat<MV extends MneDurableOutputValue<?>> extends FileOutputFormat<NullWritable, MV> {
36+
37+
@Override
38+
public RecordWriter<NullWritable, MV> getRecordWriter(FileSystem ignored, JobConf job, String name,
39+
Progressable progress) throws IOException {
40+
return new MneMapredRecordWriter<MV>();
41+
}
42+
43+
44+
45+
}

0 commit comments

Comments
 (0)