Skip to content

Commit 249495a

Browse files
Convert pubsub dicts to hashtables (#2007)
Two dicts are converted to hashtables: 1. On each client, the set of channels/patterns/shard-channels the client is subscribed to 2. On each channel or pattern, the set of clients subscribed to it. --------- Signed-off-by: Rain Valentine <rsg000@gmail.com> Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech> Co-authored-by: Rain Valentine <rsg000@gmail.com>
1 parent 5065fcf commit 249495a

File tree

6 files changed

+176
-200
lines changed

6 files changed

+176
-200
lines changed

src/acl.c

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1899,41 +1899,44 @@ static list *getUpcomingChannelList(user *new, user *original) {
18991899
/* Check if the client should be killed because it is subscribed to channels that were
19001900
* permitted in the past, are not in the `upcoming` channel list. */
19011901
static int ACLShouldKillPubsubClient(client *c, list *upcoming) {
1902-
robj *o;
1903-
int kill = 0;
1904-
19051902
if (getClientType(c) == CLIENT_TYPE_PUBSUB) {
1903+
int kill = 0;
1904+
19061905
/* Check for pattern violations. */
1907-
dictIterator *di = dictGetIterator(c->pubsub_data->pubsub_patterns);
1908-
dictEntry *de;
1909-
while (!kill && ((de = dictNext(di)) != NULL)) {
1910-
o = dictGetKey(de);
1906+
hashtableIterator iter;
1907+
hashtableInitIterator(&iter, c->pubsub_data->pubsub_patterns, 0);
1908+
void *next;
1909+
while (!kill && hashtableNext(&iter, &next)) {
1910+
robj *o = next;
19111911
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 1);
19121912
kill = (res == ACL_DENIED_CHANNEL);
19131913
}
1914-
dictReleaseIterator(di);
1914+
hashtableResetIterator(&iter);
19151915

19161916
/* Check for channel violations. */
19171917
if (!kill) {
19181918
/* Check for global channels violation. */
1919-
di = dictGetIterator(c->pubsub_data->pubsub_channels);
1920-
1921-
while (!kill && ((de = dictNext(di)) != NULL)) {
1922-
o = dictGetKey(de);
1919+
hashtableIterator iter;
1920+
hashtableInitIterator(&iter, c->pubsub_data->pubsub_channels, 0);
1921+
void *next;
1922+
while (!kill && hashtableNext(&iter, &next)) {
1923+
robj *o = next;
19231924
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);
19241925
kill = (res == ACL_DENIED_CHANNEL);
19251926
}
1926-
dictReleaseIterator(di);
1927+
hashtableResetIterator(&iter);
19271928
}
19281929
if (!kill) {
19291930
/* Check for shard channels violation. */
1930-
di = dictGetIterator(c->pubsub_data->pubsubshard_channels);
1931-
while (!kill && ((de = dictNext(di)) != NULL)) {
1932-
o = dictGetKey(de);
1931+
hashtableIterator iter;
1932+
hashtableInitIterator(&iter, c->pubsub_data->pubsubshard_channels, 0);
1933+
void *next;
1934+
while (!kill && hashtableNext(&iter, &next)) {
1935+
robj *o = next;
19331936
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);
19341937
kill = (res == ACL_DENIED_CHANNEL);
19351938
}
1936-
dictReleaseIterator(di);
1939+
hashtableResetIterator(&iter);
19371940
}
19381941

19391942
if (kill) {

src/defrag.c

Lines changed: 17 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
typedef enum { DEFRAG_NOT_DONE = 0,
5252
DEFRAG_DONE = 1 } doneStatus;
5353

54-
5554
/*
5655
* Defragmentation is performed in stages. Each stage is serviced by a stage function
5756
* (defragStageFn). The stage function is passed a target (void*) to defrag. The contents of that
@@ -135,7 +134,7 @@ typedef struct {
135134
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
136135

137136
// Private data for pubsub kvstores
138-
typedef dict *(*getClientChannelsFn)(client *);
137+
typedef hashtable *(*getClientChannelsFn)(client *);
139138
typedef struct {
140139
getClientChannelsFn fn;
141140
} getClientChannelsFnWrapper;
@@ -243,30 +242,6 @@ robj *activeDefragStringOb(robj *ob) {
243242
return new_robj;
244243
}
245244

246-
/* Defrag helper for dict main allocations (dict struct, and hash tables).
247-
* Receives a pointer to the dict* and return a new dict* when the dict
248-
* struct itself was moved.
249-
*
250-
* Returns NULL in case the allocation wasn't moved.
251-
* When it returns a non-null value, the old pointer was already released
252-
* and should NOT be accessed. */
253-
static dict *dictDefragTables(dict *d) {
254-
dict *ret = NULL;
255-
dictEntry **newtable;
256-
/* handle the dict struct */
257-
if ((ret = activeDefragAlloc(d))) d = ret;
258-
/* handle the first hash table */
259-
if (!d->ht_table[0]) return ret; /* created but unused */
260-
newtable = activeDefragAlloc(d->ht_table[0]);
261-
if (newtable) d->ht_table[0] = newtable;
262-
/* handle the second hash table */
263-
if (d->ht_table[1]) {
264-
newtable = activeDefragAlloc(d->ht_table[1]);
265-
if (newtable) d->ht_table[1] = newtable;
266-
}
267-
return ret;
268-
}
269-
270245
/* Internal function used by zslDefrag */
271246
static void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) {
272247
int i;
@@ -786,37 +761,33 @@ static void dbKeysScanCallback(void *privdata, void *elemref) {
786761
/* Defrag scan callback for a pubsub channels hashtable. */
787762
static void defragPubsubScanCallback(void *privdata, void *elemref) {
788763
defragPubSubCtx *ctx = privdata;
789-
void **channel_dict_ref = (void **)elemref;
790-
dict *newclients, *clients = *channel_dict_ref;
791-
robj *newchannel, *channel = *(robj **)dictMetadata(clients);
792-
size_t allocation_size;
764+
void **clients_ref = (void **)elemref;
765+
hashtable *newclients, *clients = *clients_ref;
766+
robj *newchannel, *channel = *(robj **)hashtableMetadata(clients);
793767

794768
/* Try to defrag the channel name. */
795-
serverAssert(channel->refcount == (int)dictSize(clients) + 1);
796-
newchannel = activeDefragStringObWithoutFree(channel, &allocation_size);
769+
serverAssert(channel->refcount == (int)hashtableSize(clients) + 1);
770+
newchannel = activeDefragStringOb(channel);
797771
if (newchannel) {
798-
*(robj **)dictMetadata(clients) = newchannel;
772+
*(robj **)hashtableMetadata(clients) = newchannel;
799773

800774
/* The channel name is shared by the client's pubsub(shard) and server's
801775
* pubsub(shard), after defraging the channel name, we need to update
802776
* the reference in the clients' dictionary. */
803-
dictIterator *di = dictGetIterator(clients);
804-
dictEntry *clientde;
805-
while ((clientde = dictNext(di)) != NULL) {
806-
client *c = dictGetKey(clientde);
807-
dict *client_channels = ctx->getPubSubChannels(c);
808-
dictEntry *pubsub_channel = dictFind(client_channels, newchannel);
809-
serverAssert(pubsub_channel);
810-
dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel);
777+
hashtableIterator iter;
778+
hashtableInitIterator(&iter, clients, 0);
779+
void *c;
780+
while (hashtableNext(&iter, &c)) {
781+
hashtable *client_channels = ctx->getPubSubChannels(c);
782+
int replaced = hashtableReplaceReallocatedEntry(client_channels, channel, newchannel);
783+
serverAssert(replaced);
811784
}
812-
dictReleaseIterator(di);
813-
// Now that we're done correcting the references, we can safely free the old channel robj
814-
allocatorDefragFree(channel, allocation_size);
785+
hashtableResetIterator(&iter);
815786
}
816787

817788
/* Try to defrag the dictionary of clients that is stored as the value part. */
818-
if ((newclients = dictDefragTables(clients)))
819-
*channel_dict_ref = newclients;
789+
if ((newclients = hashtableDefragTables(clients, activeDefragAlloc)))
790+
*clients_ref = newclients;
820791

821792
server.stat_active_defrag_scanned++;
822793
}

src/networking.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3504,9 +3504,9 @@ sds catClientInfoString(sds s, client *client, int hide_user_data) {
35043504
" flags=%s", flags,
35053505
" capa=%s", capa,
35063506
" db=%i", client->db->id,
3507-
" sub=%i", client->pubsub_data ? (int)dictSize(client->pubsub_data->pubsub_channels) : 0,
3508-
" psub=%i", client->pubsub_data ? (int)dictSize(client->pubsub_data->pubsub_patterns) : 0,
3509-
" ssub=%i", client->pubsub_data ? (int)dictSize(client->pubsub_data->pubsubshard_channels) : 0,
3507+
" sub=%i", client->pubsub_data ? (int)hashtableSize(client->pubsub_data->pubsub_channels) : 0,
3508+
" psub=%i", client->pubsub_data ? (int)hashtableSize(client->pubsub_data->pubsub_patterns) : 0,
3509+
" ssub=%i", client->pubsub_data ? (int)hashtableSize(client->pubsub_data->pubsubshard_channels) : 0,
35103510
" multi=%i", client->mstate ? client->mstate->count : -1,
35113511
" watch=%i", client->mstate ? (int)listLength(&client->mstate->watched_keys) : 0,
35123512
" qbuf=%U", client->querybuf ? (unsigned long long)sdslen(client->querybuf) : 0,

0 commit comments

Comments
 (0)