18
18
19
19
package org .apache .mnemonic .hadoop ;
20
20
21
- import java .util .Iterator ;
22
-
23
21
import org .apache .hadoop .conf .Configuration ;
24
22
import org .apache .hadoop .fs .Path ;
25
23
import org .apache .hadoop .mapreduce .TaskAttemptContext ;
26
24
import org .apache .mnemonic .ConfigurationException ;
27
25
import org .apache .mnemonic .DurableType ;
28
- import org .apache .mnemonic .EntityFactoryProxy ;
29
26
import org .apache .mnemonic .NonVolatileMemAllocator ;
30
27
import org .apache .mnemonic .Utils ;
31
- import org .apache .mnemonic .collections .DurableSinglyLinkedList ;
32
- import org .apache .mnemonic .collections .DurableSinglyLinkedListFactory ;
28
+ import org .apache .mnemonic .sessions .DurableInputSession ;
33
29
34
30
public class MneDurableInputSession <V >
35
- implements MneInputSession < V >, MneDurableComputable < NonVolatileMemAllocator > {
31
+ extends DurableInputSession < V , NonVolatileMemAllocator > {
36
32
37
33
private TaskAttemptContext taskAttemptContext ;
38
34
private Configuration configuration ;
39
- private String serviceName ;
40
- private DurableType [] durableTypes ;
41
- private EntityFactoryProxy [] entityFactoryProxies ;
42
- private long slotKeyId ;
43
-
44
- protected long m_handler ;
45
- protected NonVolatileMemAllocator m_act ;
46
-
47
35
48
36
public MneDurableInputSession (TaskAttemptContext taskAttemptContext ) {
49
37
this (taskAttemptContext .getConfiguration ());
@@ -59,13 +47,12 @@ public void validateConfig() {
59
47
throw new ConfigurationException ("The durable type of record parameters does not exist" );
60
48
} else {
61
49
if (DurableType .DURABLE == getDurableTypes ()[0 ]
62
- && getEntityFactoryProxies ().length < 1 ) { /* T.B.D. BUFFER & CHUNK */
50
+ && getEntityFactoryProxies ().length < 1 ) {
63
51
throw new ConfigurationException ("The durable entity proxy of record parameters does not exist" );
64
52
}
65
53
}
66
54
}
67
55
68
- @ Override
69
56
public void readConfig (String prefix ) {
70
57
Configuration conf = getConfiguration ();
71
58
if (conf == null ) {
@@ -79,28 +66,12 @@ public void readConfig(String prefix) {
79
66
validateConfig ();
80
67
}
81
68
82
- @ Override
83
69
public void initialize (Path path ) {
84
70
m_act = new NonVolatileMemAllocator (Utils .getNonVolatileMemoryAllocatorService (getServiceName ()), 1024000L ,
85
71
path .toString (), true );
86
72
m_handler = m_act .getHandler (getSlotKeyId ());
87
73
}
88
74
89
- @ Override
90
- public Iterator <V > iterator () {
91
- Iterator <V > iter ;
92
- DurableSinglyLinkedList <V > dsllist ;
93
- dsllist = DurableSinglyLinkedListFactory .restore (m_act , getEntityFactoryProxies (), getDurableTypes (), m_handler ,
94
- false );
95
- iter = dsllist .iterator ();
96
- return iter ;
97
- }
98
-
99
- @ Override
100
- public void close () {
101
- m_act .close ();
102
- }
103
-
104
75
public TaskAttemptContext getTaskAttemptContext () {
105
76
return taskAttemptContext ;
106
77
}
@@ -109,48 +80,6 @@ public void setTaskAttemptContext(TaskAttemptContext taskAttemptContext) {
109
80
this .taskAttemptContext = taskAttemptContext ;
110
81
}
111
82
112
- public String getServiceName () {
113
- return serviceName ;
114
- }
115
-
116
- public void setServiceName (String serviceName ) {
117
- this .serviceName = serviceName ;
118
- }
119
-
120
- public DurableType [] getDurableTypes () {
121
- return durableTypes ;
122
- }
123
-
124
- public void setDurableTypes (DurableType [] durableTypes ) {
125
- this .durableTypes = durableTypes ;
126
- }
127
-
128
- public EntityFactoryProxy [] getEntityFactoryProxies () {
129
- return entityFactoryProxies ;
130
- }
131
-
132
- public void setEntityFactoryProxies (EntityFactoryProxy [] entityFactoryProxies ) {
133
- this .entityFactoryProxies = entityFactoryProxies ;
134
- }
135
-
136
- public long getSlotKeyId () {
137
- return slotKeyId ;
138
- }
139
-
140
- public void setSlotKeyId (long slotKeyId ) {
141
- this .slotKeyId = slotKeyId ;
142
- }
143
-
144
- @ Override
145
- public NonVolatileMemAllocator getAllocator () {
146
- return m_act ;
147
- }
148
-
149
- @ Override
150
- public long getHandler () {
151
- return m_handler ;
152
- }
153
-
154
83
public Configuration getConfiguration () {
155
84
return configuration ;
156
85
}
0 commit comments