Skip to content

Commit 2b17c1d

Browse files
authored
Merge pull request #12 from tibber/toni/resubscribe
Fix resubscription for graphql-ws
2 parents 7bc46c5 + 501d964 commit 2b17c1d

File tree

1 file changed

+42
-16
lines changed

1 file changed

+42
-16
lines changed

src/Tibber.Sdk/RealTimeMeasurementListener.cs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,18 @@ private void UnsubscribeObserver(HomeStreamObserverCollection collection, IObser
172172
collection.Observers.Remove(observer);
173173
}
174174

175+
private async Task ResubscribeStream(Guid homeId, int subscriptionId, CancellationToken cancellationToken)
176+
{
177+
await UnsubscribeStream(subscriptionId, cancellationToken);
178+
await SubscribeStream(homeId, subscriptionId, cancellationToken);
179+
}
180+
175181
private async Task SubscribeStream(Guid homeId, int subscriptionId, CancellationToken cancellationToken)
176182
{
177-
Trace.WriteLine($"subscribe to {homeId}");
183+
Trace.WriteLine($"subscribe to home id {homeId} with subscription id {subscriptionId}");
178184

179185
await ExecuteStreamRequest(
186+
//$@"{{""payload"":{{""query"":""subscription{{testMeasurement(count:2, complete:false){{timestamp,power,powerReactive,powerProduction,powerProductionReactive,accumulatedConsumption,accumulatedConsumptionLastHour,accumulatedProduction,accumulatedProductionLastHour,accumulatedCost,accumulatedReward,currency,minPower,averagePower,maxPower,minPowerProduction,maxPowerProduction,voltagePhase1,voltagePhase2,voltagePhase3,currentL1,currentL2,currentL3,lastMeterConsumption,lastMeterProduction,powerFactor,signalStrength}}}}"",""variables"":{{}},""extensions"":{{}}}},""type"":""subscribe"",""id"":""{subscriptionId}""}}",
180187
$@"{{""payload"":{{""query"":""subscription{{liveMeasurement(homeId:\""{homeId}\""){{timestamp,power,powerReactive,powerProduction,powerProductionReactive,accumulatedConsumption,accumulatedConsumptionLastHour,accumulatedProduction,accumulatedProductionLastHour,accumulatedCost,accumulatedReward,currency,minPower,averagePower,maxPower,minPowerProduction,maxPowerProduction,voltagePhase1,voltagePhase2,voltagePhase3,currentL1,currentL2,currentL3,lastMeterConsumption,lastMeterProduction,powerFactor,signalStrength}}}}"",""variables"":{{}},""extensions"":{{}}}},""type"":""subscribe"",""id"":""{subscriptionId}""}}",
181188
cancellationToken);
182189

@@ -189,13 +196,14 @@ await ExecuteStreamRequest(
189196
private async Task UnsubscribeStream(int subscriptionId, CancellationToken cancellationToken)
190197
{
191198
Trace.WriteLine($"unsubscribe subscription with id {subscriptionId}");
192-
await ExecuteStreamRequest($@"{{""type"":""complete"",""id"":{subscriptionId}}}", cancellationToken);
199+
await ExecuteStreamRequest($@"{{""type"":""complete"",""id"":""{subscriptionId}""}}", cancellationToken);
193200
}
194201

195202
private Task ExecuteStreamRequest(string request, CancellationToken cancellationToken)
196203
{
197-
var stopSubscriptionRequest = new ArraySegment<byte>(Encoding.ASCII.GetBytes(request));
198-
return _wssClient.SendAsync(stopSubscriptionRequest, WebSocketMessageType.Text, true, cancellationToken);
204+
Trace.WriteLine($"send message; client state {_wssClient.State} {_wssClient.CloseStatus} {_wssClient.CloseStatusDescription} {request}");
205+
var requestBytes = new ArraySegment<byte>(Encoding.ASCII.GetBytes(request));
206+
return _wssClient.SendAsync(requestBytes, WebSocketMessageType.Text, true, cancellationToken);
199207
}
200208

201209
private async Task Initialize(Uri websocketSubscriptionUrl, CancellationToken cancellationToken)
@@ -216,7 +224,7 @@ private async Task Initialize(Uri websocketSubscriptionUrl, CancellationToken ca
216224

217225
Trace.WriteLine("web socket connected");
218226

219-
var connectionInitMessage = new WebSocketConnectionInitMessage{ Payload = connectionInitPayload };
227+
var connectionInitMessage = new WebSocketConnectionInitMessage { Payload = connectionInitPayload };
220228
var json = JsonConvert.SerializeObject(connectionInitMessage, TibberApiClient.JsonSerializerSettings);
221229
var init = new ArraySegment<byte>(Encoding.UTF8.GetBytes(json));
222230

@@ -253,6 +261,7 @@ private async void StartListening()
253261

254262
do
255263
{
264+
Trace.WriteLine($"receive message; client state {_wssClient.State} {_wssClient.CloseStatus} {_wssClient.CloseStatusDescription}");
256265
result = await _wssClient.ReceiveAsync(_receiveBuffer, _cancellationTokenSource.Token);
257266
var json = Encoding.ASCII.GetString(_receiveBuffer.Array, 0, result.Count);
258267
stringBuilder.Append(json);
@@ -285,7 +294,7 @@ private async void StartListening()
285294
if (!_cancellationTokenSource.IsCancellationRequested)
286295
{
287296
Trace.WriteLine("connection re-established; re-initialize data streams");
288-
SubscribeStreams(c => true);
297+
ResubscribeStreams(c => true);
289298
continue;
290299
}
291300
}
@@ -315,6 +324,7 @@ private async void StartListening()
315324
continue;
316325

317326
homeStreamObserverCollection.LastMessageReceivedAt = DateTimeOffset.UtcNow;
327+
homeStreamObserverCollection.ReconnectionAttempts = 0;
318328

319329
foreach (var message in measurementGroup)
320330
{
@@ -364,13 +374,13 @@ private async void StartListening()
364374
} while (!_cancellationTokenSource.IsCancellationRequested);
365375
}
366376

367-
private void SubscribeStreams(Func<HomeStreamObserverCollection, bool> predicate)
377+
private void ResubscribeStreams(Func<HomeStreamObserverCollection, bool> predicate)
368378
{
369379
lock (_homeObservables)
370380
{
371381
var subscriptionTask = (Task)Task.FromResult(0);
372382
foreach (var collection in _homeObservables.Values.Where(predicate))
373-
subscriptionTask = subscriptionTask.ContinueWith(_ => SubscribeStream(collection.Observable.HomeId, collection.Observable.SubscriptionId, _cancellationTokenSource.Token));
383+
subscriptionTask = subscriptionTask.ContinueWith(_ => ResubscribeStream(collection.Observable.HomeId, collection.Observable.SubscriptionId, _cancellationTokenSource.Token));
374384
}
375385
}
376386

@@ -441,9 +451,9 @@ private async Task TryReconnect()
441451
{
442452
try
443453
{
444-
var delay = GetDelaySeconds(failures);
445-
Trace.WriteLine($"retrying to connect in {delay} seconds");
446-
await Task.Delay(TimeSpan.FromSeconds(delay), _cancellationTokenSource.Token);
454+
var delay = GetDelay(failures);
455+
Trace.WriteLine($"retrying to connect in {delay.TotalSeconds} seconds");
456+
await Task.Delay(delay, _cancellationTokenSource.Token);
447457

448458
Trace.WriteLine("check there is a valid real time device");
449459
var homes = await _tibberApiClient.ValidateRealtimeDevice();
@@ -464,20 +474,31 @@ private void CheckDataStreamAlive(object state)
464474
{
465475
var now = DateTimeOffset.UtcNow;
466476

467-
SubscribeStreams(
477+
ResubscribeStreams(
468478
c =>
469479
{
470480
var sinceLastMessageMs = (now - c.LastMessageReceivedAt).TotalMilliseconds;
471481
if (sinceLastMessageMs <= StreamReSubscriptionCheckPeriodMs)
472482
return false;
473483

474-
Trace.WriteLine($"home {c.Observable.HomeId} subscription {c.Observable.SubscriptionId}: no data received during last {sinceLastMessageMs:N0} ms; re-initialize data stream");
475-
c.LastMessageReceivedAt = now;
484+
// Data not received during past minute; delay exponentially and then resubscribe
485+
var sinceLastReconnectionMs = (now - c.LastReconnectionAttemptAt).TotalMilliseconds;
486+
var delay = GetDelay(c.ReconnectionAttempts);
487+
if (sinceLastReconnectionMs <= delay.TotalMilliseconds)
488+
{
489+
Trace.WriteLine($"{now:yyyy-MM-dd HH:mm:ss.fff zzz} home {c.Observable.HomeId} subscription {c.Observable.SubscriptionId}: no data received during last {sinceLastMessageMs:N0} ms; reconnection attempts {c.ReconnectionAttempts}; resubscription delay {delay.TotalSeconds}s not passed yet");
490+
return false;
491+
}
492+
493+
Trace.WriteLine($"{now:yyyy-MM-dd HH:mm:ss.fff zzz} home {c.Observable.HomeId} subscription {c.Observable.SubscriptionId}: no data received during last {sinceLastMessageMs:N0} ms; reconnection attempts {c.ReconnectionAttempts}; re-initialize data stream");
494+
c.ReconnectionAttempts++;
495+
c.LastReconnectionAttemptAt = now;
496+
476497
return true;
477498
});
478499
}
479500

480-
private static int GetDelaySeconds(int failures)
501+
private static TimeSpan GetDelay(int failures)
481502
{
482503
// Jitter of 5 to 60 seconds
483504
var jitter = Random.Next(5, 60);
@@ -487,7 +508,7 @@ private static int GetDelaySeconds(int failures)
487508

488509
// Max one day 60 * 60 * 24
489510
const double oneDayInSeconds = (double)60 * 60 * 24;
490-
return jitter + (int)Math.Min(delay, oneDayInSeconds);
511+
return TimeSpan.FromSeconds(jitter + (int)Math.Min(delay, oneDayInSeconds));
491512
}
492513

493514
private class WebSocketConnectionInitMessage
@@ -517,13 +538,18 @@ private class WebSocketData
517538
{
518539
[JsonProperty("liveMeasurement")]
519540
public RealTimeMeasurement RealTimeMeasurement { get; set; }
541+
542+
[JsonProperty("testMeasurement")]
543+
public RealTimeMeasurement TestMeasurement { set { RealTimeMeasurement = value; } }
520544
}
521545

522546
private class HomeStreamObserverCollection
523547
{
524548
public readonly List<IObserver<RealTimeMeasurement>> Observers = new();
525549
public HomeRealTimeMeasurementObservable Observable;
526550
public DateTimeOffset LastMessageReceivedAt = DateTimeOffset.MaxValue;
551+
public DateTimeOffset LastReconnectionAttemptAt = DateTimeOffset.MinValue;
552+
public int ReconnectionAttempts = 0;
527553
}
528554

529555
private class Unsubscriber : IDisposable

0 commit comments

Comments
 (0)