Skip to content

Commit 7123b87

Browse files
committed
Handle any pipelined unsubscribe in async
Redis responds to an unsubscribe with one or many replies, depending on the current subscribe state. When channels/patterns names are provided in a command each given name will trigger a reply even if duplicated or not subscribed to. To know when we can return from the subscribed state we need to do bookkeeping on pending additional unsubscribe replies, and make sure we receive them all before switching state.
1 parent b6fb548 commit 7123b87

File tree

3 files changed

+91
-42
lines changed

3 files changed

+91
-42
lines changed

async.c

Lines changed: 85 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
148148
ac->sub.replies.tail = NULL;
149149
ac->sub.channels = channels;
150150
ac->sub.patterns = patterns;
151+
ac->sub.pending_unsubs = 0;
151152

152153
return ac;
153154
oom:
@@ -411,11 +412,11 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
411412
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
412413
redisContext *c = &(ac->c);
413414
dict *callbacks;
414-
redisCallback *cb;
415+
redisCallback *cb = NULL;
415416
dictEntry *de;
416417
int pvariant;
417418
char *stype;
418-
sds sname;
419+
sds sname = NULL;
419420

420421
/* Match reply with the expected format of a pushed message.
421422
* The type and number of elements (3 to 4) are specified at:
@@ -431,45 +432,44 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
431432
else
432433
callbacks = ac->sub.channels;
433434

434-
/* Ignore replies without a channel/pattern string */
435-
if (reply->element[1]->type != REDIS_REPLY_STRING) return REDIS_OK;
436-
437435
/* Locate the right callback */
438-
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
439-
if (sname == NULL)
440-
goto oom;
441-
442-
de = dictFind(callbacks,sname);
443-
if (de != NULL) {
444-
cb = dictGetEntryVal(de);
436+
if (reply->element[1]->type == REDIS_REPLY_STRING) {
437+
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
438+
if (sname == NULL) goto oom;
445439

446-
/* If this is an subscribe reply decrease pending counter. */
447-
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
448-
cb->pending_subs -= 1;
440+
if ((de = dictFind(callbacks,sname)) != NULL) {
441+
cb = dictGetEntryVal(de);
442+
memcpy(dstcb,cb,sizeof(*dstcb));
449443
}
444+
}
450445

451-
memcpy(dstcb,cb,sizeof(*dstcb));
452-
453-
/* If this is an unsubscribe message, remove it. */
454-
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
455-
if (cb->pending_subs == 0)
456-
dictDelete(callbacks,sname);
457-
458-
/* If this was the last unsubscribe message, revert to
459-
* non-subscribe mode. */
460-
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
461-
462-
/* Unset subscribed flag only when no pipelined pending subscribe. */
463-
if (reply->element[2]->integer == 0
464-
&& dictSize(ac->sub.channels) == 0
465-
&& dictSize(ac->sub.patterns) == 0) {
466-
c->flags &= ~REDIS_SUBSCRIBED;
467-
468-
/* Move ongoing regular command callbacks. */
469-
redisCallback cb;
470-
while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
471-
__redisPushCallback(&ac->replies,&cb);
472-
}
446+
/* If this is an subscribe reply decrease pending counter. */
447+
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
448+
assert(cb != NULL);
449+
cb->pending_subs -= 1;
450+
451+
} else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
452+
if (cb == NULL)
453+
ac->sub.pending_unsubs -= 1;
454+
else if (cb->pending_subs == 0)
455+
dictDelete(callbacks,sname);
456+
457+
/* If this was the last unsubscribe message, revert to
458+
* non-subscribe mode. */
459+
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
460+
461+
/* Unset subscribed flag only when no pipelined pending subscribe
462+
* or pending unsubscribe replies. */
463+
if (reply->element[2]->integer == 0
464+
&& dictSize(ac->sub.channels) == 0
465+
&& dictSize(ac->sub.patterns) == 0
466+
&& ac->sub.pending_unsubs == 0) {
467+
c->flags &= ~REDIS_SUBSCRIBED;
468+
469+
/* Move ongoing regular command callbacks. */
470+
redisCallback cb;
471+
while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
472+
__redisPushCallback(&ac->replies,&cb);
473473
}
474474
}
475475
}
@@ -542,7 +542,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
542542

543543
/* Even if the context is subscribed, pending regular
544544
* callbacks will get a reply before pub/sub messages arrive. */
545-
redisCallback cb = {NULL, NULL, 0, NULL};
545+
redisCallback cb = {NULL, NULL, 0, 0, NULL};
546546
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
547547
/*
548548
* A spontaneous reply in a not-subscribed context can be the error
@@ -759,6 +759,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
759759
redisContext *c = &(ac->c);
760760
redisCallback cb;
761761
struct dict *cbdict;
762+
dictIterator it;
762763
dictEntry *de;
763764
redisCallback *existcb;
764765
int pvariant, hasnext;
@@ -775,6 +776,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
775776
cb.fn = fn;
776777
cb.privdata = privdata;
777778
cb.pending_subs = 1;
779+
cb.unsubscribe_sent = 0;
778780

779781
/* Find out which command will be appended. */
780782
p = nextArgument(cmd,&cstr,&clen);
@@ -814,6 +816,51 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
814816
* subscribed to one or more channels or patterns. */
815817
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
816818

819+
if (pvariant)
820+
cbdict = ac->sub.patterns;
821+
else
822+
cbdict = ac->sub.channels;
823+
824+
if (hasnext) {
825+
/* Send an unsubscribe with specific channels/patterns.
826+
* Bookkeeping the number of expected replies */
827+
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
828+
sname = sdsnewlen(astr,alen);
829+
if (sname == NULL)
830+
goto oom;
831+
832+
de = dictFind(cbdict,sname);
833+
if (de != NULL) {
834+
existcb = dictGetEntryVal(de);
835+
if (existcb->unsubscribe_sent == 0)
836+
existcb->unsubscribe_sent = 1;
837+
else
838+
/* Already sent, reply to be ignored */
839+
ac->sub.pending_unsubs += 1;
840+
} else {
841+
/* Not subscribed to, reply to be ignored */
842+
ac->sub.pending_unsubs += 1;
843+
}
844+
sdsfree(sname);
845+
}
846+
} else {
847+
/* Send an unsubscribe without specific channels/patterns.
848+
* Bookkeeping the number of expected replies */
849+
int no_subs = 1;
850+
dictInitIterator(&it,cbdict);
851+
while ((de = dictNext(&it)) != NULL) {
852+
existcb = dictGetEntryVal(de);
853+
if (existcb->unsubscribe_sent == 0) {
854+
existcb->unsubscribe_sent = 1;
855+
no_subs = 0;
856+
}
857+
}
858+
/* Unsubscribing to all channels/patterns, where none is
859+
* subscribed to, results in a single reply to be ignored. */
860+
if (no_subs == 1)
861+
ac->sub.pending_unsubs += 1;
862+
}
863+
817864
/* (P)UNSUBSCRIBE does not have its own response: every channel or
818865
* pattern that is unsubscribed will receive a message. This means we
819866
* should not append a callback function for this command. */

async.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ typedef struct redisCallback {
4646
struct redisCallback *next; /* simple singly linked list */
4747
redisCallbackFn *fn;
4848
int pending_subs;
49+
int unsubscribe_sent;
4950
void *privdata;
5051
} redisCallback;
5152

@@ -105,6 +106,7 @@ typedef struct redisAsyncContext {
105106
redisCallbackList replies;
106107
struct dict *channels;
107108
struct dict *patterns;
109+
int pending_unsubs;
108110
} sub;
109111

110112
/* Any configured RESP3 PUSH handler */

test.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1729,10 +1729,10 @@ void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) {
17291729
strcmp(reply->element[2]->str,"Hello!") == 0);
17301730
state->checkpoint++;
17311731

1732-
/* Unsubscribe to channels, including a channel X which we don't subscribe to */
1732+
/* Unsubscribe to channels, including channel X & Z which we don't subscribe to */
17331733
redisAsyncCommand(ac,unexpected_cb,
17341734
(void*)"unsubscribe should not call unexpected_cb()",
1735-
"unsubscribe B X A");
1735+
"unsubscribe B X A A Z");
17361736
/* Unsubscribe to patterns, none which we subscribe to */
17371737
redisAsyncCommand(ac,unexpected_cb,
17381738
(void*)"punsubscribe should not call unexpected_cb()",
@@ -1771,8 +1771,8 @@ void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) {
17711771

17721772
/* Test handling of multiple channels
17731773
* - subscribe to channel A and B
1774-
* - a published message on A triggers an unsubscribe of channel B, X and A
1775-
* where channel X is not subscribed to.
1774+
* - a published message on A triggers an unsubscribe of channel B, X, A and Z
1775+
* where channel X and Z are not subscribed to.
17761776
* - the published message also triggers an unsubscribe to patterns. Since no
17771777
* pattern is subscribed to the responded pattern element type is NIL.
17781778
* - a command sent after unsubscribe triggers a disconnect */

0 commit comments

Comments
 (0)