Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 110 additions & 138 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,21 +468,53 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) {
}
}

/**
* Parse a node entry from a "cluster slots" sub-reply.
* Returns VALKEY_OK with host/port set, or VALKEY_ERR with error set on cc.
* The returned host pointer is either from the reply or from c->tcp.host
* since an unknown endpoint (NIL or empty string) means the same address
* as we sent this command to.
*/
static int parseClusterSlotsNodeEntry(valkeyClusterContext *cc, valkeyContext *c,
valkeyReply *elem_nodes, char **host_out,
int *port_out) {
if (elem_nodes->type != VALKEY_REPLY_ARRAY || elem_nodes->elements < 2) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Invalid node array in slot response");
return VALKEY_ERR;
}

valkeyReply *elem_ip = elem_nodes->element[0];
valkeyReply *elem_port = elem_nodes->element[1];

/* Validate ip element. Accept a NIL type since we handle unknown
* endpoints by using the address we sent the command to. */
if (elem_ip == NULL || (elem_ip->type != VALKEY_REPLY_STRING &&
elem_ip->type != VALKEY_REPLY_NIL)) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address");
return VALKEY_ERR;
}

if (elem_port == NULL || elem_port->type != VALKEY_REPLY_INTEGER ||
elem_port->integer < 1 || elem_port->integer > UINT16_MAX) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port");
return VALKEY_ERR;
}

/* An unknown endpoint (NIL or empty string) means the same address
* as we sent this command to. */
*host_out = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host;
*port_out = (int)elem_port->integer;
return VALKEY_OK;
}

/**
* Parse the "cluster slots" command reply to nodes dict.
*/
static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c,
valkeyReply *reply) {
int ret;
cluster_slot *slot = NULL;
dict *nodes = NULL;
dictEntry *den;
valkeyReply *elem_slots;
valkeyReply *elem_slots_begin, *elem_slots_end;
valkeyReply *elem_nodes;
valkeyReply *elem_ip, *elem_port;
valkeyClusterNode *primary = NULL, *replica;
uint32_t i, idx;

if (reply->type != VALKEY_REPLY_ARRAY) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
Expand All @@ -494,167 +526,107 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c,
}

nodes = dictCreate(&clusterNodesDictType);
if (nodes == NULL) {
if (nodes == NULL)
goto oom;
}

for (i = 0; i < reply->elements; i++) {
elem_slots = reply->element[i];
if (elem_slots->type != VALKEY_REPLY_ARRAY ||
elem_slots->elements < 3) {
for (uint32_t i = 0; i < reply->elements; i++) {
valkeyReply *elem_slots = reply->element[i];
if (elem_slots->type != VALKEY_REPLY_ARRAY || elem_slots->elements < 3) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"first sub_reply is not an array.");
"Invalid array in cluster slots response");
goto error;
}

slot = cluster_slot_create(NULL);
if (slot == NULL) {
goto oom;
/* Parse slot range (elements 0 and 1). */
if (elem_slots->element[0]->type != VALKEY_REPLY_INTEGER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Start slot range is not an integer");
goto error;
}
if (elem_slots->element[1]->type != VALKEY_REPLY_INTEGER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"End slot range is not an integer");
goto error;
}
uint32_t slot_start = (uint32_t)elem_slots->element[0]->integer;
uint32_t slot_end = (uint32_t)elem_slots->element[1]->integer;
if (slot_start > slot_end || slot_end >= VALKEYCLUSTER_SLOTS) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Invalid slot range");
goto error;
}

// one slots region
for (idx = 0; idx < elem_slots->elements; idx++) {
if (idx == 0) {
elem_slots_begin = elem_slots->element[idx];
if (elem_slots_begin->type != VALKEY_REPLY_INTEGER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"slot begin is not an integer.");
goto error;
}
slot->start = (int)(elem_slots_begin->integer);
} else if (idx == 1) {
elem_slots_end = elem_slots->element[idx];
if (elem_slots_end->type != VALKEY_REPLY_INTEGER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"slot end is not an integer.");
goto error;
}

slot->end = (int)(elem_slots_end->integer);

if (slot->start > slot->end) {
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"slot begin is bigger than slot end.");
goto error;
}
} else {
elem_nodes = elem_slots->element[idx];
if (elem_nodes->type != VALKEY_REPLY_ARRAY ||
elem_nodes->elements < 2) {
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"nodes sub_reply is not a correct array.");
goto error;
}

elem_ip = elem_nodes->element[0];
elem_port = elem_nodes->element[1];

/* Validate ip element. Accept a NULL value ip (NIL type) since
* we will handle the unknown endpoint special. */
if (elem_ip == NULL || (elem_ip->type != VALKEY_REPLY_STRING &&
elem_ip->type != VALKEY_REPLY_NIL)) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address");
goto error;
}

/* Validate port element. */
if (elem_port == NULL || elem_port->type != VALKEY_REPLY_INTEGER ||
(elem_port->integer < 1 || elem_port->integer > UINT16_MAX)) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port");
goto error;
}

/* Get the received ip/host. According to the docs an unknown
* endpoint or an empty string can be treated as it means
* the same address as we sent this command to.
* An unknown endpoint has the type VALKEY_REPLY_NIL and its
* length is initiated to zero. */
char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host;
if (host == NULL) {
goto oom;
}
int port = elem_port->integer;
/* Parse primary node (element 2). */
char *host;
int port;
if (parseClusterSlotsNodeEntry(cc, c, elem_slots->element[2], &host, &port) != VALKEY_OK)
goto error;

if (idx == 2) {
/* Parse a primary node. */
sds address = sdsnew(host);
if (address == NULL) {
goto oom;
}
address = sdscatfmt(address, ":%i", port);
if (address == NULL) {
goto oom;
}
/* Find or create the primary node. */
valkeyClusterNode *primary;
sds address = sdsnew(host);
if (address == NULL)
goto oom;
address = sdscatfmt(address, ":%i", port);
if (address == NULL)
goto oom;

den = dictFind(nodes, address);
sdsfree(address);
if (den != NULL) {
/* Skip parsing this primary node since it's already known. */
primary = dictGetVal(den);
ret = cluster_slot_ref_node(slot, primary);
if (ret != VALKEY_OK) {
goto oom;
}
dictEntry *den = dictFind(nodes, address);
if (den != NULL) {
sdsfree(address);
primary = dictGetVal(den);
} else {
sdsfree(address);
primary = node_get_with_slots(cc, host, port, VALKEY_ROLE_PRIMARY);
if (primary == NULL)
goto error;

slot = NULL;
break;
}
sds key = sdsnewlen(primary->addr, sdslen(primary->addr));
if (key == NULL) {
freeValkeyClusterNode(primary);
goto oom;
}
if (dictAdd(nodes, key, primary) != DICT_OK) {
sdsfree(key);
freeValkeyClusterNode(primary);
goto oom;
}

primary = node_get_with_slots(cc, host, port, VALKEY_ROLE_PRIMARY);
if (primary == NULL) {
/* Parse replica nodes (elements 3+) only for new primaries. */
if (cc->flags & VALKEY_FLAG_PARSE_REPLICAS) {
for (uint32_t idx = 3; idx < elem_slots->elements; idx++) {
if (parseClusterSlotsNodeEntry(cc, c, elem_slots->element[idx], &host, &port) != VALKEY_OK)
goto error;
}

sds key = sdsnewlen(primary->addr, sdslen(primary->addr));
if (key == NULL) {
freeValkeyClusterNode(primary);
goto oom;
}

ret = dictAdd(nodes, key, primary);
if (ret != DICT_OK) {
sdsfree(key);
freeValkeyClusterNode(primary);
goto oom;
}

ret = cluster_slot_ref_node(slot, primary);
if (ret != VALKEY_OK) {
goto oom;
}

slot = NULL;
} else if (cc->flags & VALKEY_FLAG_PARSE_REPLICAS) {
replica = node_get_with_slots(cc, host, port,
VALKEY_ROLE_REPLICA);
if (replica == NULL) {
valkeyClusterNode *replica = node_get_with_slots(cc, host, port, VALKEY_ROLE_REPLICA);
if (replica == NULL)
goto error;
}

if (primary->replicas == NULL) {
primary->replicas = listCreate();
if (primary->replicas == NULL) {
freeValkeyClusterNode(replica);
goto oom;
}

primary->replicas->free = listClusterNodeDestructor;
}

if (listAddNodeTail(primary->replicas, replica) == NULL) {
freeValkeyClusterNode(replica);
goto oom;
}
}
}
}

/* Create slot entry and link to primary. */
slot = cluster_slot_create(NULL);
if (slot == NULL)
goto oom;
slot->start = slot_start;
slot->end = slot_end;
if (cluster_slot_ref_node(slot, primary) != VALKEY_OK)
goto oom;
slot = NULL;
}

return nodes;
Expand Down
27 changes: 27 additions & 0 deletions tests/ut_slotmap_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,32 @@ void test_parse_cluster_slots_with_multiple_replicas(void) {
valkeyClusterFree(cc);
}

void test_parse_cluster_slots_with_invalid_slot_range(void) {
valkeyClusterOptions options = {0};
valkeyClusterContext *cc = createClusterContext(&options);
valkeyContext *c = valkeyContextInit();
dict *nodes;

/* Slot end larger than max slot. */
valkeyReply *reply = create_cluster_slots_reply(
"[[0, 16384, ['127.0.0.1', 30001, 'nodeid']]]");
nodes = parse_cluster_slots(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);

/* Slot start bigger than slot end. */
reply = create_cluster_slots_reply(
"[[100, 50, ['127.0.0.1', 30001, 'nodeid']]]");
nodes = parse_cluster_slots(cc, c, reply);
freeReplyObject(reply);
assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);

valkeyFree(c);
valkeyClusterFree(cc);
}

void test_parse_cluster_slots_with_noncontiguous_slots(void) {
valkeyClusterOptions options = {0};
options.options |= VALKEY_OPT_USE_REPLICAS;
Expand Down Expand Up @@ -863,6 +889,7 @@ int main(void) {
test_parse_cluster_slots_with_empty_ip();
test_parse_cluster_slots_with_null_ip();
test_parse_cluster_slots_with_multiple_replicas();
test_parse_cluster_slots_with_invalid_slot_range();
test_parse_cluster_slots_with_noncontiguous_slots();
return 0;
}
Loading