From 7622109395e0ea63fda369964c23c559c2d8338d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 20 Apr 2026 16:05:50 +0200 Subject: [PATCH] Validate slot range in parse_cluster_slots MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reject invalid slot ranges (negative, out of bounds, start > end) immediately when parsing the CLUSTER SLOTS reply, rather than letting invalid values propagate through slot creation and node linking until caught later in updateNodesAndSlotmap. Refactored the function to make the validation straightforward: extracted node entry parsing into a helper and flattened the deeply nested inner loop. Added unit test for invalid slot ranges. Signed-off-by: Björn Svensson --- src/cluster.c | 248 +++++++++++++++++--------------------- tests/ut_slotmap_update.c | 27 +++++ 2 files changed, 137 insertions(+), 138 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 48419d74..45983a53 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -468,21 +468,53 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) { } } +/** + * Parse a node entry from a "cluster slots" sub-reply. + * Returns VALKEY_OK with host/port set, or VALKEY_ERR with error set on cc. + * The returned host pointer is either from the reply or from c->tcp.host + * since an unknown endpoint (NIL or empty string) means the same address + * as we sent this command to. + */ +static int parseClusterSlotsNodeEntry(valkeyClusterContext *cc, valkeyContext *c, + valkeyReply *elem_nodes, char **host_out, + int *port_out) { + if (elem_nodes->type != VALKEY_REPLY_ARRAY || elem_nodes->elements < 2) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "Invalid node array in slot response"); + return VALKEY_ERR; + } + + valkeyReply *elem_ip = elem_nodes->element[0]; + valkeyReply *elem_port = elem_nodes->element[1]; + + /* Validate ip element. Accept a NIL type since we handle unknown + * endpoints by using the address we sent the command to. */ + if (elem_ip == NULL || (elem_ip->type != VALKEY_REPLY_STRING && + elem_ip->type != VALKEY_REPLY_NIL)) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address"); + return VALKEY_ERR; + } + + if (elem_port == NULL || elem_port->type != VALKEY_REPLY_INTEGER || + elem_port->integer < 1 || elem_port->integer > UINT16_MAX) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port"); + return VALKEY_ERR; + } + + /* An unknown endpoint (NIL or empty string) means the same address + * as we sent this command to. */ + *host_out = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host; + *port_out = (int)elem_port->integer; + return VALKEY_OK; +} + /** * Parse the "cluster slots" command reply to nodes dict. */ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, valkeyReply *reply) { - int ret; cluster_slot *slot = NULL; dict *nodes = NULL; - dictEntry *den; - valkeyReply *elem_slots; - valkeyReply *elem_slots_begin, *elem_slots_end; - valkeyReply *elem_nodes; - valkeyReply *elem_ip, *elem_port; - valkeyClusterNode *primary = NULL, *replica; - uint32_t i, idx; if (reply->type != VALKEY_REPLY_ARRAY) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); @@ -494,149 +526,81 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, } nodes = dictCreate(&clusterNodesDictType); - if (nodes == NULL) { + if (nodes == NULL) goto oom; - } - for (i = 0; i < reply->elements; i++) { - elem_slots = reply->element[i]; - if (elem_slots->type != VALKEY_REPLY_ARRAY || - elem_slots->elements < 3) { + for (uint32_t i = 0; i < reply->elements; i++) { + valkeyReply *elem_slots = reply->element[i]; + if (elem_slots->type != VALKEY_REPLY_ARRAY || elem_slots->elements < 3) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "first sub_reply is not an array."); + "Invalid array in cluster slots response"); goto error; } - slot = cluster_slot_create(NULL); - if (slot == NULL) { - goto oom; + /* Parse slot range (elements 0 and 1). */ + if (elem_slots->element[0]->type != VALKEY_REPLY_INTEGER) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "Start slot range is not an integer"); + goto error; + } + if (elem_slots->element[1]->type != VALKEY_REPLY_INTEGER) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "End slot range is not an integer"); + goto error; + } + uint32_t slot_start = (uint32_t)elem_slots->element[0]->integer; + uint32_t slot_end = (uint32_t)elem_slots->element[1]->integer; + if (slot_start > slot_end || slot_end >= VALKEYCLUSTER_SLOTS) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "Invalid slot range"); + goto error; } - // one slots region - for (idx = 0; idx < elem_slots->elements; idx++) { - if (idx == 0) { - elem_slots_begin = elem_slots->element[idx]; - if (elem_slots_begin->type != VALKEY_REPLY_INTEGER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "slot begin is not an integer."); - goto error; - } - slot->start = (int)(elem_slots_begin->integer); - } else if (idx == 1) { - elem_slots_end = elem_slots->element[idx]; - if (elem_slots_end->type != VALKEY_REPLY_INTEGER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "slot end is not an integer."); - goto error; - } - - slot->end = (int)(elem_slots_end->integer); - - if (slot->start > slot->end) { - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "slot begin is bigger than slot end."); - goto error; - } - } else { - elem_nodes = elem_slots->element[idx]; - if (elem_nodes->type != VALKEY_REPLY_ARRAY || - elem_nodes->elements < 2) { - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "nodes sub_reply is not a correct array."); - goto error; - } - - elem_ip = elem_nodes->element[0]; - elem_port = elem_nodes->element[1]; - - /* Validate ip element. Accept a NULL value ip (NIL type) since - * we will handle the unknown endpoint special. */ - if (elem_ip == NULL || (elem_ip->type != VALKEY_REPLY_STRING && - elem_ip->type != VALKEY_REPLY_NIL)) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address"); - goto error; - } - - /* Validate port element. */ - if (elem_port == NULL || elem_port->type != VALKEY_REPLY_INTEGER || - (elem_port->integer < 1 || elem_port->integer > UINT16_MAX)) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port"); - goto error; - } - - /* Get the received ip/host. According to the docs an unknown - * endpoint or an empty string can be treated as it means - * the same address as we sent this command to. - * An unknown endpoint has the type VALKEY_REPLY_NIL and its - * length is initiated to zero. */ - char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host; - if (host == NULL) { - goto oom; - } - int port = elem_port->integer; + /* Parse primary node (element 2). */ + char *host; + int port; + if (parseClusterSlotsNodeEntry(cc, c, elem_slots->element[2], &host, &port) != VALKEY_OK) + goto error; - if (idx == 2) { - /* Parse a primary node. */ - sds address = sdsnew(host); - if (address == NULL) { - goto oom; - } - address = sdscatfmt(address, ":%i", port); - if (address == NULL) { - goto oom; - } + /* Find or create the primary node. */ + valkeyClusterNode *primary; + sds address = sdsnew(host); + if (address == NULL) + goto oom; + address = sdscatfmt(address, ":%i", port); + if (address == NULL) + goto oom; - den = dictFind(nodes, address); - sdsfree(address); - if (den != NULL) { - /* Skip parsing this primary node since it's already known. */ - primary = dictGetVal(den); - ret = cluster_slot_ref_node(slot, primary); - if (ret != VALKEY_OK) { - goto oom; - } + dictEntry *den = dictFind(nodes, address); + if (den != NULL) { + sdsfree(address); + primary = dictGetVal(den); + } else { + sdsfree(address); + primary = node_get_with_slots(cc, host, port, VALKEY_ROLE_PRIMARY); + if (primary == NULL) + goto error; - slot = NULL; - break; - } + sds key = sdsnewlen(primary->addr, sdslen(primary->addr)); + if (key == NULL) { + freeValkeyClusterNode(primary); + goto oom; + } + if (dictAdd(nodes, key, primary) != DICT_OK) { + sdsfree(key); + freeValkeyClusterNode(primary); + goto oom; + } - primary = node_get_with_slots(cc, host, port, VALKEY_ROLE_PRIMARY); - if (primary == NULL) { + /* Parse replica nodes (elements 3+) only for new primaries. */ + if (cc->flags & VALKEY_FLAG_PARSE_REPLICAS) { + for (uint32_t idx = 3; idx < elem_slots->elements; idx++) { + if (parseClusterSlotsNodeEntry(cc, c, elem_slots->element[idx], &host, &port) != VALKEY_OK) goto error; - } - - sds key = sdsnewlen(primary->addr, sdslen(primary->addr)); - if (key == NULL) { - freeValkeyClusterNode(primary); - goto oom; - } - - ret = dictAdd(nodes, key, primary); - if (ret != DICT_OK) { - sdsfree(key); - freeValkeyClusterNode(primary); - goto oom; - } - ret = cluster_slot_ref_node(slot, primary); - if (ret != VALKEY_OK) { - goto oom; - } - - slot = NULL; - } else if (cc->flags & VALKEY_FLAG_PARSE_REPLICAS) { - replica = node_get_with_slots(cc, host, port, - VALKEY_ROLE_REPLICA); - if (replica == NULL) { + valkeyClusterNode *replica = node_get_with_slots(cc, host, port, VALKEY_ROLE_REPLICA); + if (replica == NULL) goto error; - } if (primary->replicas == NULL) { primary->replicas = listCreate(); @@ -644,10 +608,8 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, freeValkeyClusterNode(replica); goto oom; } - primary->replicas->free = listClusterNodeDestructor; } - if (listAddNodeTail(primary->replicas, replica) == NULL) { freeValkeyClusterNode(replica); goto oom; @@ -655,6 +617,16 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, } } } + + /* Create slot entry and link to primary. */ + slot = cluster_slot_create(NULL); + if (slot == NULL) + goto oom; + slot->start = slot_start; + slot->end = slot_end; + if (cluster_slot_ref_node(slot, primary) != VALKEY_OK) + goto oom; + slot = NULL; } return nodes; diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index 7e73b959..46ac8b97 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -791,6 +791,32 @@ void test_parse_cluster_slots_with_multiple_replicas(void) { valkeyClusterFree(cc); } +void test_parse_cluster_slots_with_invalid_slot_range(void) { + valkeyClusterOptions options = {0}; + valkeyClusterContext *cc = createClusterContext(&options); + valkeyContext *c = valkeyContextInit(); + dict *nodes; + + /* Slot end larger than max slot. */ + valkeyReply *reply = create_cluster_slots_reply( + "[[0, 16384, ['127.0.0.1', 30001, 'nodeid']]]"); + nodes = parse_cluster_slots(cc, c, reply); + freeReplyObject(reply); + assert(nodes == NULL); + assert(cc->err == VALKEY_ERR_OTHER); + + /* Slot start bigger than slot end. */ + reply = create_cluster_slots_reply( + "[[100, 50, ['127.0.0.1', 30001, 'nodeid']]]"); + nodes = parse_cluster_slots(cc, c, reply); + freeReplyObject(reply); + assert(nodes == NULL); + assert(cc->err == VALKEY_ERR_OTHER); + + valkeyFree(c); + valkeyClusterFree(cc); +} + void test_parse_cluster_slots_with_noncontiguous_slots(void) { valkeyClusterOptions options = {0}; options.options |= VALKEY_OPT_USE_REPLICAS; @@ -863,6 +889,7 @@ int main(void) { test_parse_cluster_slots_with_empty_ip(); test_parse_cluster_slots_with_null_ip(); test_parse_cluster_slots_with_multiple_replicas(); + test_parse_cluster_slots_with_invalid_slot_range(); test_parse_cluster_slots_with_noncontiguous_slots(); return 0; }