Skip to content

Commit 640becd

Browse files
author
Wang, Gang(Gary)
committed
MNEMONIC-227: Unify the usage of initNextPool() of IO sessions
1 parent 8923b1e commit 640becd

File tree

9 files changed

+149
-41
lines changed

9 files changed

+149
-41
lines changed

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,44 @@
1818

1919
package org.apache.mnemonic.hadoop;
2020

21+
import java.nio.file.Files;
22+
import java.nio.file.LinkOption;
23+
import java.nio.file.Paths;
24+
import java.util.ArrayList;
25+
import java.util.Iterator;
26+
import java.util.List;
27+
2128
import org.apache.hadoop.conf.Configuration;
2229
import org.apache.hadoop.fs.Path;
2330
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2431
import org.apache.mnemonic.ConfigurationException;
2532
import org.apache.mnemonic.DurableType;
2633
import org.apache.mnemonic.NonVolatileMemAllocator;
2734
import org.apache.mnemonic.Utils;
35+
import org.apache.mnemonic.collections.DurableSinglyLinkedList;
36+
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
2837
import org.apache.mnemonic.sessions.DurableInputSession;
2938

3039
public class MneDurableInputSession<V>
3140
extends DurableInputSession<V, NonVolatileMemAllocator> {
3241

3342
private TaskAttemptContext taskAttemptContext;
3443
private Configuration configuration;
44+
private Iterator<String> m_fp_iter;
3545

36-
public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
37-
this(taskAttemptContext.getConfiguration());
46+
public MneDurableInputSession(TaskAttemptContext taskAttemptContext, Path path) {
47+
this(taskAttemptContext.getConfiguration(), path);
3848
setTaskAttemptContext(taskAttemptContext);
3949
}
4050

41-
public MneDurableInputSession(Configuration configuration) {
51+
public MneDurableInputSession(Configuration configuration, Path path) {
4252
setConfiguration(configuration);
53+
if (!Files.isRegularFile(Paths.get(path.toString()), LinkOption.NOFOLLOW_LINKS)) {
54+
throw new UnsupportedOperationException();
55+
}
56+
List<String> fpathlist = new ArrayList<String>();
57+
fpathlist.add(path.toString());
58+
m_fp_iter = fpathlist.iterator();
4359
}
4460

4561
public void validateConfig() {
@@ -66,10 +82,29 @@ public void readConfig(String prefix) {
6682
validateConfig();
6783
}
6884

69-
public void initialize(Path path) {
70-
m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
71-
path.toString(), true);
72-
m_handler = m_act.getHandler(getSlotKeyId());
85+
@Override
86+
public boolean initNextPool() {
87+
boolean ret = false;
88+
if (m_act != null) {
89+
m_act.close();
90+
m_act = null;
91+
}
92+
if (null != m_fp_iter && m_fp_iter.hasNext()) {
93+
m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
94+
m_fp_iter.next(), true);
95+
if (null != m_act) {
96+
m_handler = m_act.getHandler(getSlotKeyId());
97+
if (0L != m_handler) {
98+
DurableSinglyLinkedList<V> dsllist = DurableSinglyLinkedListFactory.restore(
99+
m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler, false);
100+
if (null != dsllist) {
101+
m_iter = dsllist.iterator();
102+
ret = null != m_iter;
103+
}
104+
}
105+
}
106+
}
107+
return ret;
73108
}
74109

75110
public TaskAttemptContext getTaskAttemptContext() {

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,20 @@ protected String getUniqueName(String name, String extension) {
122122
}
123123

124124
@Override
125-
public void initNextPool() {
125+
public boolean initNextPool() {
126+
boolean ret = false;
126127
if (m_act != null) {
127128
m_act.close();
129+
m_act = null;
128130
}
129131
setOutputPath(genNextPoolPath());
130132
m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), getPoolSize(),
131133
getOutputPath().toString(), true);
132-
m_newpool = true;
134+
if (null != m_act) {
135+
m_newpool = true;
136+
ret = true;
137+
}
138+
return ret;
133139
}
134140

135141
public Path getOutputPath() {

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
4545

4646
public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOException {
4747
m_fileSplit = fileSplit;
48-
m_session = new MneDurableInputSession<V>(conf);
48+
m_session = new MneDurableInputSession<V>(conf, m_fileSplit.getPath());
4949
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
50-
m_session.initialize(m_fileSplit.getPath());
50+
m_session.initNextPool();
5151
m_iter = m_session.iterator();
5252
}
5353

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ public void close() throws IOException {
4848
@Override
4949
public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
5050
FileSplit split = (FileSplit) inputSplit;
51-
m_session = new MneDurableInputSession<V>(context);
51+
m_session = new MneDurableInputSession<V>(context, split.getPath());
5252
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
53-
m_session.initialize(split.getPath());
53+
m_session.initNextPool();
5454
m_iter = m_session.iterator();
5555
}
5656

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreducePersonDataTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ public void testReadPersonData() throws Exception {
156156
reader.close();
157157
}
158158
}
159-
AssertJUnit.assertEquals(m_sumage, sumage);
160159
AssertJUnit.assertEquals(m_reccnt, reccnt);
160+
AssertJUnit.assertEquals(m_sumage, sumage);
161161
System.out.println(String.format("The checksum of ages is %d", sumage));
162162
}
163163
}

mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919
package org.apache.mnemonic.sessions;
2020

2121
import java.util.Iterator;
22+
import java.util.NoSuchElementException;
2223

2324
import org.apache.mnemonic.DurableType;
2425
import org.apache.mnemonic.EntityFactoryProxy;
2526
import org.apache.mnemonic.RestorableAllocator;
26-
import org.apache.mnemonic.collections.DurableSinglyLinkedList;
27-
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
2827

2928
public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
3029
implements InputSession<V>, DurableComputable<A> {
@@ -36,20 +35,72 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
3635

3736
protected long m_handler;
3837
protected A m_act;
39-
38+
protected Iterator<V> m_iter;
39+
40+
/**
41+
* One session can only manage one iterator instance at a time for the simplicity
42+
*
43+
* @return the singleton iterator
44+
*
45+
*/
4046
@Override
4147
public Iterator<V> iterator() {
42-
Iterator<V> iter;
43-
DurableSinglyLinkedList<V> dsllist;
44-
dsllist = DurableSinglyLinkedListFactory.restore(m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler,
45-
false);
46-
iter = dsllist.iterator();
47-
return iter;
48+
return new Intr();
49+
}
50+
51+
/**
52+
* this class defines a iterator for multiple pools read
53+
*
54+
*/
55+
private class Intr implements Iterator<V> {
56+
57+
/**
58+
* determine the existing of next
59+
*
60+
* @return true if there is a next node
61+
*
62+
*/
63+
@Override
64+
public boolean hasNext() {
65+
if (null == m_iter) {
66+
return false;
67+
}
68+
boolean ret = m_iter.hasNext();
69+
if (!ret) {
70+
if (initNextPool()) {
71+
ret = m_iter.hasNext();
72+
}
73+
}
74+
return ret;
75+
}
76+
77+
/**
78+
* get next node
79+
*
80+
* @return the next node
81+
*/
82+
@Override
83+
public V next() {
84+
if (null == m_iter) {
85+
throw new NoSuchElementException();
86+
}
87+
return m_iter.next();
88+
}
89+
90+
/**
91+
* override remove()
92+
*/
93+
@Override
94+
public void remove() {
95+
throw new UnsupportedOperationException();
96+
}
4897
}
4998

5099
@Override
51100
public void close() {
52-
m_act.close();
101+
if (null != m_act) {
102+
m_act.close();
103+
}
53104
}
54105

55106
public String getServiceName() {

mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
4646
protected DurableSinglyLinkedList<V> m_listnode;
4747
protected A m_act;
4848

49-
protected abstract void initNextPool();
50-
5149
@Override
5250
public A getAllocator() {
5351
return m_act;
@@ -119,16 +117,17 @@ public V newDurableObjectRecord(long size) {
119117
if (ret != null) {
120118
((Durable) ret).destroy();
121119
}
122-
initNextPool();
123-
try { /* retry */
124-
nv = createDurableNode();
125-
ret = createDurableObjectRecord(size);
126-
} catch (OutOfHybridMemory ee) {
127-
if (nv != null) {
128-
nv.destroy();
129-
}
130-
if (ret != null) {
131-
((Durable) ret).destroy();
120+
if (initNextPool()) {
121+
try { /* retry */
122+
nv = createDurableNode();
123+
ret = createDurableObjectRecord(size);
124+
} catch (OutOfHybridMemory ee) {
125+
if (nv != null) {
126+
nv.destroy();
127+
}
128+
if (ret != null) {
129+
((Durable) ret).destroy();
130+
}
132131
}
133132
}
134133
}
@@ -168,8 +167,9 @@ public void post(V v) {
168167
try {
169168
nv = createDurableNode();
170169
} catch (OutOfHybridMemory e) {
171-
initNextPool();
172-
nv = createDurableNode();
170+
if (initNextPool()) {
171+
nv = createDurableNode();
172+
}
173173
}
174174
break;
175175
}
@@ -201,8 +201,10 @@ public void destroyAllPendingRecords() {
201201

202202
@Override
203203
public void close() {
204-
destroyAllPendingRecords();
205-
m_act.close();
204+
if (null != m_act) {
205+
destroyAllPendingRecords();
206+
m_act.close();
207+
}
206208
}
207209

208210
public long getSlotKeyId() {

mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
import java.io.Closeable;
2222
import java.util.Iterator;
2323

24-
public interface InputSession<V> extends Closeable {
24+
public interface InputSession<V> extends Closeable, Iterable<V> {
25+
26+
/**
27+
* Initialize the next pool, must be called before use
28+
*
29+
* @return true if success
30+
*/
31+
boolean initNextPool();
2532

2633
Iterator<V> iterator();
2734

mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@
2222

2323
public interface OutputSession<V> extends Closeable {
2424

25+
/**
26+
* Initialize the next pool, must be called before use
27+
*
28+
* @return true if success
29+
*/
30+
boolean initNextPool();
31+
2532
V newDurableObjectRecord();
2633

2734
V newDurableObjectRecord(long size);

0 commit comments

Comments
 (0)