Skip to content

Commit 942fccb

Browse files
authored
Merge pull request #353 from timpikelmg/feature/209-add-sliding-invisibility-timeouts
#209 - Add Support for Sliding Invisibility Timeouts
2 parents fbb8e2a + b8817ff commit 942fccb

File tree

9 files changed

+415
-48
lines changed

9 files changed

+415
-48
lines changed
Lines changed: 119 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// This file is part of Hangfire.PostgreSql.
1+
// This file is part of Hangfire.PostgreSql.
22
// Copyright © 2014 Frank Hommers <http://hmm.rs/Hangfire.PostgreSql>.
33
//
44
// Hangfire.PostgreSql is free software: you can redistribute it and/or modify
@@ -20,55 +20,91 @@
2020
// Special thanks goes to him.
2121

2222
using System;
23+
using System.Threading;
2324
using Dapper;
25+
using Hangfire.Logging;
26+
using Hangfire.PostgreSql.Utils;
2427
using Hangfire.Storage;
2528

2629
namespace Hangfire.PostgreSql
2730
{
2831
public class PostgreSqlFetchedJob : IFetchedJob
2932
{
33+
private readonly ILog _logger = LogProvider.GetLogger(typeof(PostgreSqlFetchedJob));
34+
3035
private readonly PostgreSqlStorage _storage;
3136
private bool _disposed;
3237
private bool _removedFromQueue;
3338
private bool _requeued;
3439

40+
private readonly object _syncRoot = new object();
41+
private long _lastHeartbeat;
42+
private readonly TimeSpan _interval;
43+
3544
public PostgreSqlFetchedJob(
3645
PostgreSqlStorage storage,
3746
long id,
3847
string jobId,
39-
string queue)
48+
string queue,
49+
DateTime? fetchedAt)
4050
{
4151
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
4252

4353
Id = id;
4454
JobId = jobId ?? throw new ArgumentNullException(nameof(jobId));
4555
Queue = queue ?? throw new ArgumentNullException(nameof(queue));
56+
FetchedAt = fetchedAt ?? throw new ArgumentNullException(nameof(fetchedAt));
57+
58+
if (storage.Options.UseSlidingInvisibilityTimeout)
59+
{
60+
_lastHeartbeat = TimestampHelper.GetTimestamp();
61+
_interval = TimeSpan.FromSeconds(storage.Options.InvisibilityTimeout.TotalSeconds / 5);
62+
storage.HeartbeatProcess.Track(this);
63+
}
4664
}
4765

4866
public long Id { get; }
4967
public string Queue { get; }
5068
public string JobId { get; }
69+
internal DateTime? FetchedAt { get; private set; }
5170

5271
public void RemoveFromQueue()
5372
{
54-
_storage.UseConnection(null, connection => connection.Execute($@"
55-
DELETE FROM ""{_storage.Options.SchemaName}"".""jobqueue"" WHERE ""id"" = @Id;
56-
",
57-
new { Id }));
73+
lock (_syncRoot)
74+
{
75+
if (!FetchedAt.HasValue)
76+
{
77+
return;
78+
}
79+
80+
_storage.UseConnection(null, connection => connection.Execute($@"
81+
DELETE FROM ""{_storage.Options.SchemaName}"".""jobqueue"" WHERE ""id"" = @Id AND ""fetchedat"" = @FetchedAt;
82+
",
83+
new { Id, FetchedAt }));
5884

59-
_removedFromQueue = true;
85+
_removedFromQueue = true;
86+
}
6087
}
6188

6289
public void Requeue()
6390
{
64-
_storage.UseConnection(null, connection => connection.Execute($@"
65-
UPDATE ""{_storage.Options.SchemaName}"".""jobqueue""
66-
SET ""fetchedat"" = NULL
67-
WHERE ""id"" = @Id;
68-
",
69-
new { Id }));
70-
71-
_requeued = true;
91+
lock (_syncRoot)
92+
{
93+
if (!FetchedAt.HasValue)
94+
{
95+
return;
96+
}
97+
98+
_storage.UseConnection(null, connection => connection.Execute($@"
99+
UPDATE ""{_storage.Options.SchemaName}"".""jobqueue""
100+
SET ""fetchedat"" = NULL
101+
WHERE ""id"" = @Id AND ""fetchedat"" = @FetchedAt;
102+
",
103+
new { Id, FetchedAt }));
104+
105+
FetchedAt = null;
106+
_requeued = true;
107+
}
72108
}
73109

74110
public void Dispose()
@@ -78,12 +114,77 @@ public void Dispose()
78114
return;
79115
}
80116

81-
if (!_removedFromQueue && !_requeued)
117+
_disposed = true;
118+
119+
DisposeTimer();
120+
121+
lock (_syncRoot)
82122
{
83-
Requeue();
123+
if (!_removedFromQueue && !_requeued)
124+
{
125+
Requeue();
126+
}
84127
}
128+
}
129+
130+
internal void DisposeTimer()
131+
{
132+
if (_storage.Options.UseSlidingInvisibilityTimeout)
133+
{
134+
_storage.HeartbeatProcess.Untrack(this);
135+
}
136+
}
137+
138+
internal void ExecuteKeepAliveQueryIfRequired()
139+
{
140+
var now = TimestampHelper.GetTimestamp();
85141

86-
_disposed = true;
142+
if (TimestampHelper.Elapsed(now, Interlocked.Read(ref _lastHeartbeat)) < _interval)
143+
{
144+
return;
145+
}
146+
147+
lock (_syncRoot)
148+
{
149+
if (!FetchedAt.HasValue)
150+
{
151+
return;
152+
}
153+
154+
if (_requeued || _removedFromQueue)
155+
{
156+
return;
157+
}
158+
159+
string updateFetchAtSql = $@"
160+
UPDATE ""{_storage.Options.SchemaName}"".""jobqueue""
161+
SET ""fetchedat"" = NOW()
162+
WHERE ""id"" = @id AND ""fetchedat"" = @fetchedAt
163+
RETURNING ""fetchedat"" AS ""FetchedAt"";
164+
";
165+
166+
try
167+
{
168+
_storage.UseConnection(null, connection =>
169+
{
170+
FetchedAt = connection.ExecuteScalar<DateTime?>(updateFetchAtSql,
171+
new { queue = Queue, id = Id, fetchedAt = FetchedAt });
172+
});
173+
174+
if (!FetchedAt.HasValue)
175+
{
176+
_logger.Warn(
177+
$"Background job identifier '{JobId}' was fetched by another worker, will not execute keep alive.");
178+
}
179+
180+
_logger.Trace($"Keep-alive query for message {Id} sent");
181+
Interlocked.Exchange(ref _lastHeartbeat, now);
182+
}
183+
catch (Exception ex) when (ex.IsCatchableExceptionType())
184+
{
185+
_logger.DebugException($"Unable to execute keep-alive query for message {Id}", ex);
186+
}
187+
}
87188
}
88189
}
89190
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// This file is part of Hangfire.PostgreSql.
2+
// Copyright © 2014 Frank Hommers <http://hmm.rs/Hangfire.PostgreSql>.
3+
//
4+
// Hangfire.PostgreSql is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as
6+
// published by the Free Software Foundation, either version 3
7+
// of the License, or any later version.
8+
//
9+
// Hangfire.PostgreSql is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public
15+
// License along with Hangfire.PostgreSql. If not, see <http://www.gnu.org/licenses/>.
16+
//
17+
// This work is based on the work of Sergey Odinokov, author of
18+
// Hangfire. <http://hangfire.io/>
19+
//
20+
// Special thanks goes to him.
21+
22+
using System;
23+
using System.Collections.Concurrent;
24+
using System.Threading;
25+
using Hangfire.Common;
26+
using Hangfire.Server;
27+
28+
namespace Hangfire.PostgreSql
29+
{
30+
#pragma warning disable CS0618
31+
internal sealed class PostgreSqlHeartbeatProcess : IServerComponent, IBackgroundProcess
32+
#pragma warning restore CS0618
33+
{
34+
private readonly ConcurrentDictionary<PostgreSqlFetchedJob, object> _items = new();
35+
36+
public void Track(PostgreSqlFetchedJob item)
37+
{
38+
_items.TryAdd(item, null);
39+
}
40+
41+
public void Untrack(PostgreSqlFetchedJob item)
42+
{
43+
_items.TryRemove(item, out var _);
44+
}
45+
46+
public void Execute(CancellationToken cancellationToken)
47+
{
48+
foreach (var item in _items)
49+
{
50+
item.Key.ExecuteKeepAliveQueryIfRequired();
51+
}
52+
53+
cancellationToken.Wait(TimeSpan.FromSeconds(1));
54+
}
55+
56+
public void Execute(BackgroundProcessContext context)
57+
{
58+
Execute(context.StoppingToken);
59+
}
60+
}
61+
}

src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ LIMIT 1
195195
return new PostgreSqlFetchedJob(_storage,
196196
fetchedJob.Id,
197197
fetchedJob.JobId.ToString(CultureInfo.InvariantCulture),
198-
fetchedJob.Queue);
198+
fetchedJob.Queue,
199+
fetchedJob.FetchedAt);
199200
}
200201

201202
[NotNull]
@@ -214,7 +215,6 @@ internal IFetchedJob Dequeue_UpdateCount(string[] queues, CancellationToken canc
214215
long timeoutSeconds = (long)_storage.Options.InvisibilityTimeout.Negate().TotalSeconds;
215216
FetchedJob markJobAsFetched = null;
216217

217-
218218
string jobToFetchSql = $@"
219219
SELECT ""id"" AS ""Id"", ""jobid"" AS ""JobId"", ""queue"" AS ""Queue"", ""fetchedat"" AS ""FetchedAt"", ""updatecount"" AS ""UpdateCount""
220220
FROM ""{_storage.Options.SchemaName}"".""jobqueue""
@@ -264,7 +264,8 @@ internal IFetchedJob Dequeue_UpdateCount(string[] queues, CancellationToken canc
264264
return new PostgreSqlFetchedJob(_storage,
265265
markJobAsFetched.Id,
266266
markJobAsFetched.JobId.ToString(CultureInfo.InvariantCulture),
267-
markJobAsFetched.Queue);
267+
markJobAsFetched.Queue,
268+
markJobAsFetched.FetchedAt);
268269
}
269270

270271
private Task ListenForNotificationsAsync(CancellationToken cancellationToken)

src/Hangfire.PostgreSql/PostgreSqlStorage.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,18 @@ public PostgreSqlStorage(IConnectionFactory connectionFactory, PostgreSqlStorage
102102
}
103103

104104
InitializeQueueProviders();
105+
if (Options.UseSlidingInvisibilityTimeout)
106+
{
107+
HeartbeatProcess = new PostgreSqlHeartbeatProcess();
108+
}
105109
}
106110

107111
public PersistentJobQueueProviderCollection QueueProviders { get; internal set; }
108112

109113
internal PostgreSqlStorageOptions Options { get; }
110114

115+
internal PostgreSqlHeartbeatProcess HeartbeatProcess { get; }
116+
111117
public override IMonitoringApi GetMonitoringApi()
112118
{
113119
return new PostgreSqlMonitoringApi(this, QueueProviders);
@@ -124,13 +130,19 @@ public override IEnumerable<IServerComponent> GetComponents()
124130
{
125131
yield return new ExpirationManager(this);
126132
yield return new CountersAggregator(this, Options.CountersAggregateInterval);
133+
if (Options.UseSlidingInvisibilityTimeout)
134+
{
135+
// This is only used to update the sliding invisibility timeouts, so if not enabled then do not use it
136+
yield return HeartbeatProcess;
137+
}
127138
}
128139

129140
public override void WriteOptionsToLog(ILog logger)
130141
{
131-
logger.Info("Using the following options for SQL Server job storage:");
142+
logger.Info("Using the following options for PostgreSQL job storage:");
132143
logger.InfoFormat(" Queue poll interval: {0}.", Options.QueuePollInterval);
133144
logger.InfoFormat(" Invisibility timeout: {0}.", Options.InvisibilityTimeout);
145+
logger.InfoFormat(" Use sliding invisibility timeout: {0}.", Options.UseSlidingInvisibilityTimeout);
134146
}
135147

136148
public override string ToString()

src/Hangfire.PostgreSql/PostgreSqlStorageOptions.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public PostgreSqlStorageOptions()
4949
PrepareSchemaIfNecessary = true;
5050
EnableTransactionScopeEnlistment = true;
5151
DeleteExpiredBatchSize = 1000;
52+
UseSlidingInvisibilityTimeout = false;
5253
}
5354

5455
public TimeSpan QueuePollInterval
@@ -68,7 +69,7 @@ public TimeSpan InvisibilityTimeout
6869
_invisibilityTimeout = value;
6970
}
7071
}
71-
72+
7273
public TimeSpan DistributedLockTimeout
7374
{
7475
get => _distributedLockTimeout;
@@ -125,6 +126,14 @@ public int DeleteExpiredBatchSize
125126
public bool EnableTransactionScopeEnlistment { get; set; }
126127
public bool EnableLongPolling { get; set; }
127128

129+
/// <summary>
130+
/// Apply a sliding invisibility timeout where the last fetched time is continually updated in the background.
131+
/// This allows a lower invisibility timeout to be used with longer running jobs
132+
/// IMPORTANT: If <see cref="BackgroundJobServerOptions.IsLightweightServer" /> option is used, then sliding invisiblity timeouts will not work
133+
/// since the background storage processes are not run (which is used to update the invisibility timeouts)
134+
/// </summary>
135+
public bool UseSlidingInvisibilityTimeout { get; set; }
136+
128137
private static void ThrowIfValueIsNotPositive(TimeSpan value, string fieldName)
129138
{
130139
string message = $"The {fieldName} property value should be positive. Given: {value}.";
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// This file is part of Hangfire. Copyright © 2022 Hangfire OÜ.
2+
//
3+
// Hangfire is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU Lesser General Public License as
5+
// published by the Free Software Foundation, either version 3
6+
// of the License, or any later version.
7+
//
8+
// Hangfire is distributed in the hope that it will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU Lesser General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU Lesser General Public
14+
// License along with Hangfire. If not, see <http://www.gnu.org/licenses/>.
15+
16+
// Borrowed from Hangfire
17+
18+
using System;
19+
20+
namespace Hangfire.PostgreSql.Utils
21+
{
22+
internal static class ExceptionTypeHelper
23+
{
24+
#if !NETSTANDARD1_3
25+
private static readonly Type StackOverflowType = typeof(StackOverflowException);
26+
#endif
27+
private static readonly Type OutOfMemoryType = typeof(OutOfMemoryException);
28+
29+
internal static bool IsCatchableExceptionType(this Exception e)
30+
{
31+
var type = e.GetType();
32+
return
33+
#if !NETSTANDARD1_3
34+
type != StackOverflowType &&
35+
#endif
36+
type != OutOfMemoryType;
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)