diff --git a/src/cluster.c b/src/cluster.c index 48419d74..45983a53 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -468,21 +468,53 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) { } } +/** + * Parse a node entry from a "cluster slots" sub-reply. + * Returns VALKEY_OK with host/port set, or VALKEY_ERR with error set on cc. + * The returned host pointer is either from the reply or from c->tcp.host + * since an unknown endpoint (NIL or empty string) means the same address + * as we sent this command to. + */ +static int parseClusterSlotsNodeEntry(valkeyClusterContext *cc, valkeyContext *c, + valkeyReply *elem_nodes, char **host_out, + int *port_out) { + if (elem_nodes->type != VALKEY_REPLY_ARRAY || elem_nodes->elements < 2) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "Invalid node array in slot response"); + return VALKEY_ERR; + } + + valkeyReply *elem_ip = elem_nodes->element[0]; + valkeyReply *elem_port = elem_nodes->element[1]; + + /* Validate ip element. Accept a NIL type since we handle unknown + * endpoints by using the address we sent the command to. */ + if (elem_ip == NULL || (elem_ip->type != VALKEY_REPLY_STRING && + elem_ip->type != VALKEY_REPLY_NIL)) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address"); + return VALKEY_ERR; + } + + if (elem_port == NULL || elem_port->type != VALKEY_REPLY_INTEGER || + elem_port->integer < 1 || elem_port->integer > UINT16_MAX) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port"); + return VALKEY_ERR; + } + + /* An unknown endpoint (NIL or empty string) means the same address + * as we sent this command to. */ + *host_out = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host; + *port_out = (int)elem_port->integer; + return VALKEY_OK; +} + /** * Parse the "cluster slots" command reply to nodes dict. */ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, valkeyReply *reply) { - int ret; cluster_slot *slot = NULL; dict *nodes = NULL; - dictEntry *den; - valkeyReply *elem_slots; - valkeyReply *elem_slots_begin, *elem_slots_end; - valkeyReply *elem_nodes; - valkeyReply *elem_ip, *elem_port; - valkeyClusterNode *primary = NULL, *replica; - uint32_t i, idx; if (reply->type != VALKEY_REPLY_ARRAY) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); @@ -494,149 +526,81 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, } nodes = dictCreate(&clusterNodesDictType); - if (nodes == NULL) { + if (nodes == NULL) goto oom; - } - for (i = 0; i < reply->elements; i++) { - elem_slots = reply->element[i]; - if (elem_slots->type != VALKEY_REPLY_ARRAY || - elem_slots->elements < 3) { + for (uint32_t i = 0; i < reply->elements; i++) { + valkeyReply *elem_slots = reply->element[i]; + if (elem_slots->type != VALKEY_REPLY_ARRAY || elem_slots->elements < 3) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "first sub_reply is not an array."); + "Invalid array in cluster slots response"); goto error; } - slot = cluster_slot_create(NULL); - if (slot == NULL) { - goto oom; + /* Parse slot range (elements 0 and 1). */ + if (elem_slots->element[0]->type != VALKEY_REPLY_INTEGER) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "Start slot range is not an integer"); + goto error; + } + if (elem_slots->element[1]->type != VALKEY_REPLY_INTEGER) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "End slot range is not an integer"); + goto error; + } + uint32_t slot_start = (uint32_t)elem_slots->element[0]->integer; + uint32_t slot_end = (uint32_t)elem_slots->element[1]->integer; + if (slot_start > slot_end || slot_end >= VALKEYCLUSTER_SLOTS) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "Invalid slot range"); + goto error; } - // one slots region - for (idx = 0; idx < elem_slots->elements; idx++) { - if (idx == 0) { - elem_slots_begin = elem_slots->element[idx]; - if (elem_slots_begin->type != VALKEY_REPLY_INTEGER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "slot begin is not an integer."); - goto error; - } - slot->start = (int)(elem_slots_begin->integer); - } else if (idx == 1) { - elem_slots_end = elem_slots->element[idx]; - if (elem_slots_end->type != VALKEY_REPLY_INTEGER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "slot end is not an integer."); - goto error; - } - - slot->end = (int)(elem_slots_end->integer); - - if (slot->start > slot->end) { - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "slot begin is bigger than slot end."); - goto error; - } - } else { - elem_nodes = elem_slots->element[idx]; - if (elem_nodes->type != VALKEY_REPLY_ARRAY || - elem_nodes->elements < 2) { - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "nodes sub_reply is not a correct array."); - goto error; - } - - elem_ip = elem_nodes->element[0]; - elem_port = elem_nodes->element[1]; - - /* Validate ip element. Accept a NULL value ip (NIL type) since - * we will handle the unknown endpoint special. */ - if (elem_ip == NULL || (elem_ip->type != VALKEY_REPLY_STRING && - elem_ip->type != VALKEY_REPLY_NIL)) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address"); - goto error; - } - - /* Validate port element. */ - if (elem_port == NULL || elem_port->type != VALKEY_REPLY_INTEGER || - (elem_port->integer < 1 || elem_port->integer > UINT16_MAX)) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port"); - goto error; - } - - /* Get the received ip/host. According to the docs an unknown - * endpoint or an empty string can be treated as it means - * the same address as we sent this command to. - * An unknown endpoint has the type VALKEY_REPLY_NIL and its - * length is initiated to zero. */ - char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host; - if (host == NULL) { - goto oom; - } - int port = elem_port->integer; + /* Parse primary node (element 2). */ + char *host; + int port; + if (parseClusterSlotsNodeEntry(cc, c, elem_slots->element[2], &host, &port) != VALKEY_OK) + goto error; - if (idx == 2) { - /* Parse a primary node. */ - sds address = sdsnew(host); - if (address == NULL) { - goto oom; - } - address = sdscatfmt(address, ":%i", port); - if (address == NULL) { - goto oom; - } + /* Find or create the primary node. */ + valkeyClusterNode *primary; + sds address = sdsnew(host); + if (address == NULL) + goto oom; + address = sdscatfmt(address, ":%i", port); + if (address == NULL) + goto oom; - den = dictFind(nodes, address); - sdsfree(address); - if (den != NULL) { - /* Skip parsing this primary node since it's already known. */ - primary = dictGetVal(den); - ret = cluster_slot_ref_node(slot, primary); - if (ret != VALKEY_OK) { - goto oom; - } + dictEntry *den = dictFind(nodes, address); + if (den != NULL) { + sdsfree(address); + primary = dictGetVal(den); + } else { + sdsfree(address); + primary = node_get_with_slots(cc, host, port, VALKEY_ROLE_PRIMARY); + if (primary == NULL) + goto error; - slot = NULL; - break; - } + sds key = sdsnewlen(primary->addr, sdslen(primary->addr)); + if (key == NULL) { + freeValkeyClusterNode(primary); + goto oom; + } + if (dictAdd(nodes, key, primary) != DICT_OK) { + sdsfree(key); + freeValkeyClusterNode(primary); + goto oom; + } - primary = node_get_with_slots(cc, host, port, VALKEY_ROLE_PRIMARY); - if (primary == NULL) { + /* Parse replica nodes (elements 3+) only for new primaries. */ + if (cc->flags & VALKEY_FLAG_PARSE_REPLICAS) { + for (uint32_t idx = 3; idx < elem_slots->elements; idx++) { + if (parseClusterSlotsNodeEntry(cc, c, elem_slots->element[idx], &host, &port) != VALKEY_OK) goto error; - } - - sds key = sdsnewlen(primary->addr, sdslen(primary->addr)); - if (key == NULL) { - freeValkeyClusterNode(primary); - goto oom; - } - - ret = dictAdd(nodes, key, primary); - if (ret != DICT_OK) { - sdsfree(key); - freeValkeyClusterNode(primary); - goto oom; - } - ret = cluster_slot_ref_node(slot, primary); - if (ret != VALKEY_OK) { - goto oom; - } - - slot = NULL; - } else if (cc->flags & VALKEY_FLAG_PARSE_REPLICAS) { - replica = node_get_with_slots(cc, host, port, - VALKEY_ROLE_REPLICA); - if (replica == NULL) { + valkeyClusterNode *replica = node_get_with_slots(cc, host, port, VALKEY_ROLE_REPLICA); + if (replica == NULL) goto error; - } if (primary->replicas == NULL) { primary->replicas = listCreate(); @@ -644,10 +608,8 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, freeValkeyClusterNode(replica); goto oom; } - primary->replicas->free = listClusterNodeDestructor; } - if (listAddNodeTail(primary->replicas, replica) == NULL) { freeValkeyClusterNode(replica); goto oom; @@ -655,6 +617,16 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c, } } } + + /* Create slot entry and link to primary. */ + slot = cluster_slot_create(NULL); + if (slot == NULL) + goto oom; + slot->start = slot_start; + slot->end = slot_end; + if (cluster_slot_ref_node(slot, primary) != VALKEY_OK) + goto oom; + slot = NULL; } return nodes; diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index 7e73b959..46ac8b97 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -791,6 +791,32 @@ void test_parse_cluster_slots_with_multiple_replicas(void) { valkeyClusterFree(cc); } +void test_parse_cluster_slots_with_invalid_slot_range(void) { + valkeyClusterOptions options = {0}; + valkeyClusterContext *cc = createClusterContext(&options); + valkeyContext *c = valkeyContextInit(); + dict *nodes; + + /* Slot end larger than max slot. */ + valkeyReply *reply = create_cluster_slots_reply( + "[[0, 16384, ['127.0.0.1', 30001, 'nodeid']]]"); + nodes = parse_cluster_slots(cc, c, reply); + freeReplyObject(reply); + assert(nodes == NULL); + assert(cc->err == VALKEY_ERR_OTHER); + + /* Slot start bigger than slot end. */ + reply = create_cluster_slots_reply( + "[[100, 50, ['127.0.0.1', 30001, 'nodeid']]]"); + nodes = parse_cluster_slots(cc, c, reply); + freeReplyObject(reply); + assert(nodes == NULL); + assert(cc->err == VALKEY_ERR_OTHER); + + valkeyFree(c); + valkeyClusterFree(cc); +} + void test_parse_cluster_slots_with_noncontiguous_slots(void) { valkeyClusterOptions options = {0}; options.options |= VALKEY_OPT_USE_REPLICAS; @@ -863,6 +889,7 @@ int main(void) { test_parse_cluster_slots_with_empty_ip(); test_parse_cluster_slots_with_null_ip(); test_parse_cluster_slots_with_multiple_replicas(); + test_parse_cluster_slots_with_invalid_slot_range(); test_parse_cluster_slots_with_noncontiguous_slots(); return 0; }