Skip to content

Commit 341156c

Browse files
author
Wang, Gang(Gary)
committed
MNEMONIC-230: Make it possible to return the iterator of DurableInputSession multiple times.
1 parent e912470 commit 341156c

File tree

7 files changed

+138
-52
lines changed

7 files changed

+138
-52
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.mnemonic;
19+
20+
import java.io.Closeable;
21+
import java.util.Iterator;
22+
23+
public interface CloseableIterator<E> extends Iterator<E>, Closeable {
24+
25+
}

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.mnemonic.collections.DurableSinglyLinkedList;
3636
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
3737
import org.apache.mnemonic.sessions.DurableInputSession;
38+
import org.apache.mnemonic.sessions.SessionIterator;
3839

3940
public class MneDurableInputSession<V>
4041
extends DurableInputSession<V, NonVolatileMemAllocator> {
@@ -83,23 +84,23 @@ public void readConfig(String prefix) {
8384
}
8485

8586
@Override
86-
public boolean initNextPool() {
87+
protected boolean initNextPool(SessionIterator<V, NonVolatileMemAllocator> sessiter) {
8788
boolean ret = false;
88-
if (m_act != null) {
89-
m_act.close();
90-
m_act = null;
89+
if (sessiter.getAllocator() != null) {
90+
sessiter.getAllocator().close();
91+
sessiter.setAllocator(null);
9192
}
9293
if (null != m_fp_iter && m_fp_iter.hasNext()) {
93-
m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()), 1024000L,
94-
m_fp_iter.next(), true);
95-
if (null != m_act) {
96-
m_handler = m_act.getHandler(getSlotKeyId());
97-
if (0L != m_handler) {
94+
sessiter.setAllocator(new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(
95+
getServiceName()), 1024000L, m_fp_iter.next(), true));
96+
if (null != sessiter.getAllocator()) {
97+
sessiter.setHandler(sessiter.getAllocator().getHandler(getSlotKeyId()));
98+
if (0L != sessiter.getHandler()) {
9899
DurableSinglyLinkedList<V> dsllist = DurableSinglyLinkedListFactory.restore(
99-
m_act, getEntityFactoryProxies(), getDurableTypes(), m_handler, false);
100+
sessiter.getAllocator(), getEntityFactoryProxies(), getDurableTypes(), sessiter.getHandler(), false);
100101
if (null != dsllist) {
101-
m_iter = dsllist.iterator();
102-
ret = null != m_iter;
102+
sessiter.setIterator(dsllist.iterator());
103+
ret = null != sessiter.getIterator();
103104
}
104105
}
105106
}

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapred/MneMapredRecordReader.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
package org.apache.mnemonic.hadoop.mapred;
2020

2121
import java.io.IOException;
22-
import java.util.Iterator;
2322

2423
import org.apache.hadoop.io.NullWritable;
2524
import org.apache.hadoop.mapred.FileSplit;
2625
import org.apache.hadoop.mapred.JobConf;
26+
import org.apache.mnemonic.CloseableIterator;
2727
import org.apache.mnemonic.hadoop.MneConfigHelper;
2828
import org.apache.mnemonic.hadoop.MneDurableInputSession;
2929
import org.apache.mnemonic.hadoop.MneDurableInputValue;
@@ -38,7 +38,7 @@
3838
public class MneMapredRecordReader<MV extends MneDurableInputValue<V>, V>
3939
implements org.apache.hadoop.mapred.RecordReader<NullWritable, MV> {
4040

41-
protected Iterator<V> m_iter;
41+
protected CloseableIterator<V> m_iter;
4242
protected MneDurableInputSession<V> m_session;
4343
protected FileSplit m_fileSplit;
4444

@@ -47,7 +47,6 @@ public MneMapredRecordReader(FileSplit fileSplit, JobConf conf) throws IOExcepti
4747
m_fileSplit = fileSplit;
4848
m_session = new MneDurableInputSession<V>(conf, m_fileSplit.getPath());
4949
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
50-
m_session.initNextPool();
5150
m_iter = m_session.iterator();
5251
}
5352

@@ -79,7 +78,9 @@ public long getPos() throws IOException {
7978

8079
@Override
8180
public void close() throws IOException {
82-
m_session.close();
81+
if (null != m_iter) {
82+
m_iter.close();
83+
}
8384
}
8485

8586
@Override

mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
package org.apache.mnemonic.hadoop.mapreduce;
1919

2020
import java.io.IOException;
21-
import java.util.Iterator;
2221

2322
import org.apache.hadoop.io.NullWritable;
2423
import org.apache.hadoop.mapreduce.InputSplit;
2524
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2625
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
26+
import org.apache.mnemonic.CloseableIterator;
2727
import org.apache.mnemonic.hadoop.MneConfigHelper;
2828
import org.apache.mnemonic.hadoop.MneDurableInputSession;
2929
import org.apache.mnemonic.hadoop.MneDurableInputValue;
@@ -37,20 +37,21 @@
3737
public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
3838
extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, MV> {
3939

40-
protected Iterator<V> m_iter;
40+
protected CloseableIterator<V> m_iter;
4141
protected MneDurableInputSession<V> m_session;
4242

4343
@Override
4444
public void close() throws IOException {
45-
m_session.close();
45+
if (null != m_iter) {
46+
m_iter.close();
47+
}
4648
}
4749

4850
@Override
4951
public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
5052
FileSplit split = (FileSplit) inputSplit;
5153
m_session = new MneDurableInputSession<V>(context, split.getPath());
5254
m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
53-
m_session.initNextPool();
5455
m_iter = m_session.iterator();
5556
}
5657

mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableInputSession.java

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@
2626
import org.apache.mnemonic.RestorableAllocator;
2727

2828
public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
29-
implements InputSession<V>, DurableComputable<A> {
29+
implements InputSession<V> {
3030

3131
private String serviceName;
3232
private DurableType[] durableTypes;
3333
private EntityFactoryProxy[] entityFactoryProxies;
3434
private long slotKeyId;
3535

36-
protected long m_handler;
37-
protected A m_act;
38-
protected Iterator<V> m_iter;
36+
/**
37+
* Initialize the next pool, must be called before use
38+
*
39+
* @return true if success
40+
*/
41+
protected abstract boolean initNextPool(SessionIterator<V, A> sessiter);
3942

4043
/**
4144
* One session can only manage one iterator instance at a time for the simplicity
@@ -44,15 +47,21 @@ public abstract class DurableInputSession<V, A extends RestorableAllocator<A>>
4447
*
4548
*/
4649
@Override
47-
public Iterator<V> iterator() {
48-
return new Intr();
50+
public SessionIterator<V, A> iterator() {
51+
SessionIterator<V, A> ret = new Intr();
52+
initNextPool(ret);
53+
return ret;
4954
}
5055

5156
/**
5257
* this class defines a iterator for multiple pools read
5358
*
5459
*/
55-
private class Intr implements Iterator<V> {
60+
private class Intr implements SessionIterator<V, A> {
61+
62+
protected long m_handler;
63+
protected A m_act;
64+
protected Iterator<V> m_iter;
5665

5766
/**
5867
* determine the existing of next
@@ -67,7 +76,7 @@ public boolean hasNext() {
6776
}
6877
boolean ret = m_iter.hasNext();
6978
if (!ret) {
70-
if (initNextPool()) {
79+
if (initNextPool(this)) {
7180
ret = m_iter.hasNext();
7281
}
7382
}
@@ -94,13 +103,44 @@ public V next() {
94103
public void remove() {
95104
throw new UnsupportedOperationException();
96105
}
97-
}
98106

99-
@Override
100-
public void close() {
101-
if (null != m_act) {
102-
m_act.close();
107+
@Override
108+
public A getAllocator() {
109+
return m_act;
103110
}
111+
112+
@Override
113+
public long getHandler() {
114+
return m_handler;
115+
}
116+
117+
@Override
118+
public void setAllocator(A alloc) {
119+
m_act = alloc;
120+
}
121+
122+
@Override
123+
public void setHandler(long hdl) {
124+
m_handler = hdl;
125+
}
126+
127+
@Override
128+
public void setIterator(Iterator<V> iter) {
129+
m_iter = iter;
130+
}
131+
132+
@Override
133+
public void close() {
134+
if (null != m_act) {
135+
m_act.close();
136+
}
137+
}
138+
139+
@Override
140+
public Iterator<V> getIterator() {
141+
return m_iter;
142+
}
143+
104144
}
105145

106146
public String getServiceName() {
@@ -135,14 +175,4 @@ public void setSlotKeyId(long slotKeyId) {
135175
this.slotKeyId = slotKeyId;
136176
}
137177

138-
@Override
139-
public A getAllocator() {
140-
return m_act;
141-
}
142-
143-
@Override
144-
public long getHandler() {
145-
return m_handler;
146-
}
147-
148178
}

mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/InputSession.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,9 @@
1818

1919
package org.apache.mnemonic.sessions;
2020

21-
import java.io.Closeable;
2221
import java.util.Iterator;
2322

24-
public interface InputSession<V> extends Closeable, Iterable<V> {
25-
26-
/**
27-
* Initialize the next pool, must be called before use
28-
*
29-
* @return true if success
30-
*/
31-
boolean initNextPool();
23+
public interface InputSession<V> extends Iterable<V> {
3224

3325
Iterator<V> iterator();
3426

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.mnemonic.sessions;
20+
21+
import java.util.Iterator;
22+
23+
import org.apache.mnemonic.CloseableIterator;
24+
import org.apache.mnemonic.RestorableAllocator;
25+
26+
public interface SessionIterator<V, A extends RestorableAllocator<A>>
27+
extends CloseableIterator<V>, DurableComputable<A> {
28+
29+
void setAllocator(A alloc);
30+
31+
void setHandler(long hdl);
32+
33+
void setIterator(Iterator<V> iter);
34+
35+
Iterator<V> getIterator();
36+
}

0 commit comments

Comments
 (0)