File tree Expand file tree Collapse file tree 1 file changed +16
-30
lines changed Expand file tree Collapse file tree 1 file changed +16
-30
lines changed Original file line number Diff line number Diff line change @@ -151,40 +151,26 @@ impl Consumer {
151
151
}
152
152
153
153
pub async fn store_offset ( & self , offset : u64 ) -> Result < ( ) , ConsumerStoreOffsetError > {
154
- if self . name . is_none ( ) {
155
- return Err ( ConsumerStoreOffsetError :: NameMissing ) ;
156
- }
157
-
158
- let result = self
159
- . internal
160
- . client
161
- . store_offset (
162
- self . name . clone ( ) . unwrap ( ) . as_str ( ) ,
163
- self . internal . stream . as_str ( ) ,
164
- offset,
165
- )
166
- . await ;
167
-
168
- match result {
169
- Ok ( ( ) ) => Ok ( ( ) ) ,
170
- Err ( e) => Err ( ConsumerStoreOffsetError :: Client ( e) ) ,
154
+ if let Some ( name) = & self . name {
155
+ self . internal
156
+ . client
157
+ . store_offset ( name. as_str ( ) , self . internal . stream . as_str ( ) , offset)
158
+ . await
159
+ . map ( Ok ) ?
160
+ } else {
161
+ Err ( ConsumerStoreOffsetError :: NameMissing )
171
162
}
172
163
}
173
164
174
165
pub async fn query_offset ( & self ) -> Result < u64 , ConsumerStoreOffsetError > {
175
- if self . name . is_none ( ) {
176
- return Err ( ConsumerStoreOffsetError :: NameMissing ) ;
177
- }
178
-
179
- let result = self
180
- . internal
181
- . client
182
- . query_offset ( self . name . clone ( ) . unwrap ( ) , self . internal . stream . as_str ( ) )
183
- . await ;
184
-
185
- match result {
186
- Ok ( r) => Ok ( r) ,
187
- Err ( e) => Err ( ConsumerStoreOffsetError :: Client ( e) ) ,
166
+ if let Some ( name) = & self . name {
167
+ self . internal
168
+ . client
169
+ . query_offset ( name. clone ( ) , self . internal . stream . as_str ( ) )
170
+ . await
171
+ . map ( Ok ) ?
172
+ } else {
173
+ Err ( ConsumerStoreOffsetError :: NameMissing )
188
174
}
189
175
}
190
176
}
You can’t perform that action at this time.
0 commit comments