Skip to content

Commit 160aa5f

Browse files
jolex007Kamil Khamitov
authored andcommitted
Fix cancellation in unifetcher
commit_hash:909fa7aadbf673448dbc709b19d2088963b40404
1 parent 548dd7e commit 160aa5f

File tree

7 files changed

+306
-23
lines changed

7 files changed

+306
-23
lines changed

library/cpp/http/simple/http_client.cpp

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,42 +25,48 @@ TKeepAliveHttpClient::TKeepAliveHttpClient(const TString& host,
2525
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoGet(const TStringBuf relativeUrl,
2626
IOutputStream* output,
2727
const THeaders& headers,
28-
THttpHeaders* outHeaders) {
28+
THttpHeaders* outHeaders,
29+
NThreading::TCancellationToken cancellation) {
2930
return DoRequest(TStringBuf("GET"),
3031
relativeUrl,
3132
{},
3233
output,
3334
headers,
34-
outHeaders);
35+
outHeaders,
36+
std::move(cancellation));
3537
}
3638

3739
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoPost(const TStringBuf relativeUrl,
3840
const TStringBuf body,
3941
IOutputStream* output,
4042
const THeaders& headers,
41-
THttpHeaders* outHeaders) {
43+
THttpHeaders* outHeaders,
44+
NThreading::TCancellationToken cancellation) {
4245
return DoRequest(TStringBuf("POST"),
4346
relativeUrl,
4447
body,
4548
output,
4649
headers,
47-
outHeaders);
50+
outHeaders,
51+
std::move(cancellation));
4852
}
4953

5054
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequest(const TStringBuf method,
5155
const TStringBuf relativeUrl,
5256
const TStringBuf body,
5357
IOutputStream* output,
5458
const THeaders& inHeaders,
55-
THttpHeaders* outHeaders) {
59+
THttpHeaders* outHeaders,
60+
NThreading::TCancellationToken cancellation) {
5661
const TString contentLength = IntToString<10, size_t>(body.size());
57-
return DoRequestReliable(FormRequest(method, relativeUrl, body, inHeaders, contentLength), output, outHeaders);
62+
return DoRequestReliable(FormRequest(method, relativeUrl, body, inHeaders, contentLength), output, outHeaders, std::move(cancellation));
5863
}
5964

6065
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestRaw(const TStringBuf raw,
6166
IOutputStream* output,
62-
THttpHeaders* outHeaders) {
63-
return DoRequestReliable(raw, output, outHeaders);
67+
THttpHeaders* outHeaders,
68+
NThreading::TCancellationToken cancellation) {
69+
return DoRequestReliable(raw, output, outHeaders, std::move(cancellation));
6470
}
6571

6672
void TKeepAliveHttpClient::DisableVerificationForHttps() {
@@ -189,28 +195,28 @@ void TSimpleHttpClient::EnableVerificationForHttps() {
189195
HttpsVerification = true;
190196
}
191197

192-
void TSimpleHttpClient::DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers) const {
198+
void TSimpleHttpClient::DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers, NThreading::TCancellationToken cancellation) const {
193199
TKeepAliveHttpClient cl = CreateClient();
194200

195-
TKeepAliveHttpClient::THttpCode code = cl.DoGet(relativeUrl, output, headers);
201+
TKeepAliveHttpClient::THttpCode code = cl.DoGet(relativeUrl, output, headers, nullptr, std::move(cancellation));
196202

197203
Y_ENSURE(cl.GetHttpInput());
198204
ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code);
199205
}
200206

201-
void TSimpleHttpClient::DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THashMap<TString, TString>& headers) const {
207+
void TSimpleHttpClient::DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THashMap<TString, TString>& headers, NThreading::TCancellationToken cancellation) const {
202208
TKeepAliveHttpClient cl = CreateClient();
203209

204-
TKeepAliveHttpClient::THttpCode code = cl.DoPost(relativeUrl, body, output, headers);
210+
TKeepAliveHttpClient::THttpCode code = cl.DoPost(relativeUrl, body, output, headers, nullptr, std::move(cancellation));
205211

206212
Y_ENSURE(cl.GetHttpInput());
207213
ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code);
208214
}
209215

210-
void TSimpleHttpClient::DoPostRaw(const TStringBuf relativeUrl, const TStringBuf rawRequest, IOutputStream* output) const {
216+
void TSimpleHttpClient::DoPostRaw(const TStringBuf relativeUrl, const TStringBuf rawRequest, IOutputStream* output, NThreading::TCancellationToken cancellation) const {
211217
TKeepAliveHttpClient cl = CreateClient();
212218

213-
TKeepAliveHttpClient::THttpCode code = cl.DoRequestRaw(rawRequest, output);
219+
TKeepAliveHttpClient::THttpCode code = cl.DoRequestRaw(rawRequest, output, nullptr, std::move(cancellation));
214220

215221
Y_ENSURE(cl.GetHttpInput());
216222
ProcessResponse(relativeUrl, *cl.GetHttpInput(), output, code);

library/cpp/http/simple/http_client.h

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <library/cpp/http/io/stream.h>
1313
#include <library/cpp/http/misc/httpcodes.h>
1414
#include <library/cpp/openssl/io/stream.h>
15+
#include <library/cpp/threading/cancellation/cancellation_token.h>
1516

1617
class TNetworkAddress;
1718
class IOutputStream;
@@ -54,27 +55,31 @@ class TKeepAliveHttpClient {
5455
THttpCode DoGet(const TStringBuf relativeUrl,
5556
IOutputStream* output = nullptr,
5657
const THeaders& headers = THeaders(),
57-
THttpHeaders* outHeaders = nullptr);
58+
THttpHeaders* outHeaders = nullptr,
59+
NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
5860

5961
// builds post request from headers and body
6062
THttpCode DoPost(const TStringBuf relativeUrl,
6163
const TStringBuf body,
6264
IOutputStream* output = nullptr,
6365
const THeaders& headers = THeaders(),
64-
THttpHeaders* outHeaders = nullptr);
66+
THttpHeaders* outHeaders = nullptr,
67+
NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
6568

6669
// builds request with any HTTP method from headers and body
6770
THttpCode DoRequest(const TStringBuf method,
6871
const TStringBuf relativeUrl,
6972
const TStringBuf body,
7073
IOutputStream* output = nullptr,
7174
const THeaders& inHeaders = THeaders(),
72-
THttpHeaders* outHeaders = nullptr);
75+
THttpHeaders* outHeaders = nullptr,
76+
NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
7377

7478
// requires already well-formed request
7579
THttpCode DoRequestRaw(const TStringBuf raw,
7680
IOutputStream* output = nullptr,
77-
THttpHeaders* outHeaders = nullptr);
81+
THttpHeaders* outHeaders = nullptr,
82+
NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default());
7883

7984
void DisableVerificationForHttps();
8085
void SetClientCertificate(const TOpenSslClientIO::TOptions::TClientCert& options);
@@ -93,7 +98,8 @@ class TKeepAliveHttpClient {
9398
template <class T>
9499
THttpCode DoRequestReliable(const T& raw,
95100
IOutputStream* output,
96-
THttpHeaders* outHeaders);
101+
THttpHeaders* outHeaders,
102+
NThreading::TCancellationToken cancellation);
97103

98104
TVector<IOutputStream::TPart> FormRequest(TStringBuf method, const TStringBuf relativeUrl,
99105
TStringBuf body,
@@ -166,13 +172,13 @@ class TSimpleHttpClient {
166172

167173
void EnableVerificationForHttps();
168174

169-
void DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers = THeaders()) const;
175+
void DoGet(const TStringBuf relativeUrl, IOutputStream* output, const THeaders& headers = THeaders(), NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const;
170176

171177
// builds post request from headers and body
172-
void DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THeaders& headers = THeaders()) const;
178+
void DoPost(const TStringBuf relativeUrl, TStringBuf body, IOutputStream* output, const THeaders& headers = THeaders(), NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const;
173179

174180
// requires already well-formed post request
175-
void DoPostRaw(const TStringBuf relativeUrl, TStringBuf rawRequest, IOutputStream* output) const;
181+
void DoPostRaw(const TStringBuf relativeUrl, TStringBuf rawRequest, IOutputStream* output, NThreading::TCancellationToken cancellation = NThreading::TCancellationToken::Default()) const;
176182

177183
virtual ~TSimpleHttpClient();
178184

@@ -227,6 +233,10 @@ namespace NPrivate {
227233
return HttpIn.Get();
228234
}
229235

236+
void Shutdown() {
237+
Socket.ShutDown(SHUT_RDWR);
238+
}
239+
230240
private:
231241
static TNetworkAddress Resolve(const TString& host, ui32 port);
232242

@@ -250,12 +260,18 @@ namespace NPrivate {
250260
template <class T>
251261
TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestReliable(const T& raw,
252262
IOutputStream* output,
253-
THttpHeaders* outHeaders) {
263+
THttpHeaders* outHeaders,
264+
NThreading::TCancellationToken cancellation) {
265+
254266
for (int i = 0; i < 2; ++i) {
255267
const bool haveNewConnection = CreateNewConnectionIfNeeded();
256268
const bool couldRetry = !haveNewConnection && i == 0; // Actually old connection could be already closed by server,
257269
// so we should try one more time in this case.
258270
try {
271+
cancellation.Future().Subscribe([&](auto&) {
272+
Connection->Shutdown();
273+
});
274+
259275
Connection->Write(raw);
260276

261277
THttpCode code = ReadAndTransferHttp(*Connection->GetHttpInput(), output, outHeaders);
@@ -265,16 +281,19 @@ TKeepAliveHttpClient::THttpCode TKeepAliveHttpClient::DoRequestReliable(const T&
265281
return code;
266282
} catch (const TSystemError& e) {
267283
Connection.Reset();
284+
cancellation.ThrowIfCancellationRequested();
268285
if (!couldRetry || e.Status() != EPIPE) {
269286
throw;
270287
}
271288
} catch (const THttpReadException&) { // Actually old connection is already closed by server
272289
Connection.Reset();
290+
cancellation.ThrowIfCancellationRequested();
273291
if (!couldRetry) {
274292
throw;
275293
}
276294
} catch (const std::exception&) {
277295
Connection.Reset();
296+
cancellation.ThrowIfCancellationRequested();
278297
throw;
279298
}
280299
}

library/cpp/http/simple/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ PEERDIR(
44
library/cpp/http/io
55
library/cpp/openssl/io
66
library/cpp/string_utils/url
7+
library/cpp/threading/cancellation
78
)
89

910
SRCS(
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
The Cancellation library
2+
========================
3+
4+
Intro
5+
-----
6+
7+
This small library provides primitives for implementation of a cooperative cancellation of long running or asynchronous operations.
8+
The design has been copied from the well-known CancellationTokenSource/CancellationToken classes of the .NET Framework
9+
10+
To use the library include `cancellation_token.h`.
11+
12+
Examples
13+
--------
14+
15+
1. Simple check for cancellation
16+
17+
```c++
18+
void LongRunningOperation(TCancellationToken token) {
19+
...
20+
if (token.IsCancellationRequested()) {
21+
return;
22+
}
23+
...
24+
}
25+
26+
TCancellationTokenSource source;
27+
TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
28+
thread.Start();
29+
...
30+
source.Cancel();
31+
thread.Join();
32+
```
33+
34+
2. Exit via an exception
35+
36+
```c++
37+
void LongRunningOperation(TCancellationToken token) {
38+
try {
39+
for (;;) {
40+
...
41+
token.ThrowIfCancellationRequested();
42+
...
43+
}
44+
} catch (TOperationCancelledException const&) {
45+
return;
46+
} catch (...) {
47+
Y_ABORT("Never should be there")
48+
}
49+
}
50+
51+
TCancellationTokenSource source;
52+
TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
53+
thread.Start();
54+
...
55+
source.Cancel();
56+
thread.Join();
57+
```
58+
59+
3. Periodic poll with cancellation
60+
61+
```c++
62+
void LongRunningOperation(TCancellationToken token) {
63+
while (!token.Wait(PollInterval)) {
64+
...
65+
}
66+
}
67+
68+
TCancellationTokenSource source;
69+
TThread thread([token = source.Token()]() { LongRunningOperation(std::move(token)); });
70+
thread.Start();
71+
...
72+
source.Cancel();
73+
thread.Join();
74+
```
75+
76+
4. Waiting on the future
77+
78+
```c++
79+
TFuture<void> InnerOperation();
80+
TFuture<void> OuterOperation(TCancellationToken token) {
81+
return WaitAny(FirstOperation(), token.Future())
82+
.Apply([token = std::move(token)](auto&&) {
83+
token.ThrowIfCancellationRequested();
84+
});
85+
}
86+
87+
TCancellationTokenSource source;
88+
auto future = OuterOperation();
89+
...
90+
source.Cancel()
91+
...
92+
try {
93+
auto value = future.ExtractValueSync();
94+
} catch (TOperationCancelledException const&) {
95+
// cancelled
96+
}
97+
```
98+
99+
5. Using default token when no cancellation needed
100+
101+
```c++
102+
void LongRunningOperation(TCancellationToken token) {
103+
...
104+
if (token.IsCancellationRequested()) {
105+
return;
106+
}
107+
...
108+
}
109+
110+
// We do not want to cancel the operation. So, there is no need to create a cancellation token source
111+
LongRunningOperation(TCancellationToken::Default);
112+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#include "cancellation_token.h"

0 commit comments

Comments
 (0)