@@ -14,10 +14,10 @@ using TEvGet = TEvBlobStorage::TEvGet;
14
14
15
15
struct TBlockIO ::TLoaded : public TEvBlobStorage::TEvGetResult::TResponse{ };
16
16
17
- TBlockIO::TBlockIO (TActorId service , ui64 cookie )
17
+ TBlockIO::TBlockIO (TActorId statActorId , ui64 eventCookie )
18
18
: ::NActors::IActorCallback(static_cast <TReceiveFunc>(&TBlockIO::Inbox), NKikimrServices::TActivity::SAUSAGE_BIO_A)
19
- , Service(service )
20
- , Cookie(cookie )
19
+ , StatActorId(statActorId )
20
+ , EventCookie(eventCookie )
21
21
{
22
22
}
23
23
@@ -42,10 +42,24 @@ void TBlockIO::Inbox(TEventHandlePtr &eh)
42
42
Handle (eh->Cookie , { ptr, size_t (ev->ResponseSz ) });
43
43
}
44
44
} else if (auto *ev = eh->CastAsLocal <NBlockIO::TEvFetch>()) {
45
- Y_ENSURE (!Owner, " TBlockIO actor now can hanle only one request" );
45
+ Y_ENSURE (!Sender, " TBlockIO actor now can handle only one request" );
46
+ Sender = eh->Sender ;
47
+
48
+ Priority = ev->Priority ;
49
+ TraceId = std::move (ev->TraceId );
50
+ RequestCookie = ev->Cookie ;
51
+
52
+ PageCollection = std::move (ev->PageCollection );
53
+ Pages = std::move (ev->Pages );
54
+ Y_ENSURE (Pages, " Got TFetch request without pages list" );
55
+ PagesToBlobsConverter = new TPagesToBlobsConverter (*PageCollection, Pages);
56
+ BlockStates.reserve (Pages.size ());
57
+ for (auto page: Pages) {
58
+ ui64 size = PageCollection->Page (page).Size ;
59
+ BlockStates.emplace_back (size);
60
+ }
46
61
47
- Owner = eh->Sender ;
48
- Bootstrap (ev->Priority , ev->Fetch );
62
+ Dispatch ();
49
63
} else if (eh->CastAsLocal <TEvents::TEvUndelivered>()) {
50
64
Terminate (NKikimrProto::UNKNOWN);
51
65
} else if (eh->CastAsLocal <TEvents::TEvPoison>()) {
@@ -55,25 +69,6 @@ void TBlockIO::Inbox(TEventHandlePtr &eh)
55
69
}
56
70
}
57
71
58
- void TBlockIO::Bootstrap (EPriority priority, TAutoPtr<NPageCollection::TFetch> origin)
59
- {
60
- Origin = origin;
61
- Priority = priority;
62
-
63
- Y_ENSURE (Origin->Pages , " Got TFetch request without pages list" );
64
-
65
- PagesToBlobsConverter = new TPagesToBlobsConverter (*Origin->PageCollection , Origin->Pages );
66
-
67
- BlockStates.reserve (Origin->Pages .size ());
68
-
69
- for (auto page: Origin->Pages ) {
70
- ui64 size = Origin->PageCollection ->Page (page).Size ;
71
- BlockStates.emplace_back (size);
72
- }
73
-
74
- Dispatch ();
75
- }
76
-
77
72
void TBlockIO::Dispatch ()
78
73
{
79
74
const auto ctx = ActorContext ();
@@ -101,7 +96,7 @@ void TBlockIO::Dispatch()
101
96
ui32 lastBlob = Max<ui32>();
102
97
for (const auto on : xrange (+more)) {
103
98
auto &brick = PagesToBlobsConverter->Queue [more.From + on];
104
- auto glob = Origin-> PageCollection ->Glob (brick.Blob );
99
+ auto glob = PageCollection->Glob (brick.Blob );
105
100
106
101
if ((group = (on ? group : glob.Group )) != glob.Group ) {
107
102
Y_TABLET_ERROR (" Cannot handle different groups in one request" );
@@ -127,12 +122,12 @@ void TBlockIO::Dispatch()
127
122
128
123
auto *ev = new TEvGet (query, +more, TInstant::Max (), klass, false );
129
124
130
- SendToBSProxy (ctx, group, ev, more.From /* cookie, request offset */ , std::move (Origin-> TraceId ));
125
+ SendToBSProxy (ctx, group, ev, more.From /* cookie, request offset */ , std::move (TraceId));
131
126
}
132
127
133
128
if (auto logl = Logger->Log (ELnLev::Debug)) {
134
129
logl
135
- << " NBlockIO pageCollection " << Origin-> PageCollection ->Label () << " cooked flow "
130
+ << " NBlockIO pageCollection " << PageCollection->Label () << " cooked flow "
136
131
<< PagesToBlobsConverter->OnHold << " b " << PagesToBlobsConverter->Tail << " p" << " " << PagesToBlobsConverter->Queue .size ()
137
132
<< " bricks in " << Pending << " reads, " << BlockStates.size () << " p req" ;
138
133
}
@@ -144,15 +139,15 @@ void TBlockIO::Handle(ui32 base, TArrayRef<TLoaded> items)
144
139
{
145
140
if (auto logl = Logger->Log (ELnLev::Debug)) {
146
141
logl
147
- << " NBlockIO pageCollection " << Origin-> PageCollection ->Label () << " got base"
142
+ << " NBlockIO pageCollection " << PageCollection->Label () << " got base"
148
143
<< " " << items.size () << " bricks, left " << Pending;
149
144
}
150
145
151
146
for (auto &piece: items) {
152
147
if (piece.Status != NKikimrProto::OK) {
153
148
if (auto logl = Logger->Log (ELnLev::Warn)) {
154
149
logl
155
- << " NBlockIO pageCollection " << Origin-> PageCollection ->Label () << " get failed"
150
+ << " NBlockIO pageCollection " << PageCollection->Label () << " get failed"
156
151
<< " , " << piece.Id << " status " << piece.Status ;
157
152
}
158
153
@@ -171,21 +166,21 @@ void TBlockIO::Handle(ui32 base, TArrayRef<TLoaded> items)
171
166
return ;
172
167
173
168
size_t index = 0 ;
174
- for (ui32 pageId : Origin-> Pages ) {
169
+ for (auto pageId : Pages) {
175
170
auto & state = BlockStates.at (index++);
176
171
Y_ENSURE (state.Offset == state.Data .size ());
177
- if (Origin-> PageCollection ->Verify (pageId, state.Data )) {
172
+ if (PageCollection->Verify (pageId, state.Data )) {
178
173
continue ;
179
174
} else if (auto logl = Logger->Log (ELnLev::Crit)) {
180
- const auto bnd = Origin-> PageCollection ->Bounds (pageId);
175
+ const auto bnd = PageCollection->Bounds (pageId);
181
176
182
177
logl
183
- << " NBlockIO pageCollection " << Origin-> PageCollection ->Label () << " verify failed"
178
+ << " NBlockIO pageCollection " << PageCollection->Label () << " verify failed"
184
179
<< " , page " << pageId << " " << state.Data .size () << " b"
185
180
<< " spans over {" ;
186
181
187
182
for (auto one: xrange (bnd.Lo .Blob , bnd.Up .Blob + 1 )) {
188
- const auto glob = Origin-> PageCollection ->Glob (one);
183
+ const auto glob = PageCollection->Glob (one);
189
184
190
185
logl << " " << glob.Group << " " << glob.Logo ;
191
186
}
@@ -203,26 +198,26 @@ void TBlockIO::Terminate(EStatus code)
203
198
{
204
199
if (auto logl = Logger->Log (code ? ELnLev::Warn : ELnLev::Debug)) {
205
200
logl
206
- << " NBlockIO pageCollection " << Origin-> PageCollection ->Label () << " end, status " << code
207
- << " , cookie {req " << Origin-> Cookie << " ev " << Cookie << " }"
201
+ << " NBlockIO pageCollection " << PageCollection->Label () << " end, status " << code
202
+ << " , cookie {req " << RequestCookie << " ev " << EventCookie << " }"
208
203
<< " , " << BlockStates.size () << " pages" ;
209
204
}
210
205
211
- auto *ev = new TEvData (Origin, code );
206
+ auto *ev = new TEvData (code, PageCollection, RequestCookie );
212
207
213
- if (code == NKikimrProto::OK) {
214
- size_t index = 0 ;
215
- ev->Blocks . reserve (ev-> Fetch -> Pages . size ()) ;
216
- for (ui32 pageId : ev-> Fetch -> Pages ) {
217
- auto & state = BlockStates. at (index++);
218
- ev-> Blocks . emplace_back (pageId, std::move (state. Data ) );
208
+ ev-> Pages . resize (Pages. size ());
209
+ for ( auto index : xrange (Pages. size ())) {
210
+ auto & page = ev->Pages [index] ;
211
+ page. PageId = Pages[index];
212
+ if (code == NKikimrProto::OK) {
213
+ page. Data = std::move (BlockStates. at (index++). Data );
219
214
}
220
215
}
221
216
222
- if (Service )
223
- Send (Service , new TEvStat (EDir::Read, Priority, PagesToBlobsConverter->OnHold , TotalOps, std::move (GroupBytes), std::move (GroupOps)));
217
+ if (StatActorId )
218
+ Send (StatActorId , new TEvStat (EDir::Read, Priority, PagesToBlobsConverter->OnHold , TotalOps, std::move (GroupBytes), std::move (GroupOps)));
224
219
225
- Send (Owner , ev, 0 , Cookie );
220
+ Send (Sender , ev, 0 , EventCookie );
226
221
227
222
return PassAway ();
228
223
}
0 commit comments