redis源码学习-cluster篇
0 条评论redis源码学习-cluster篇
对于redis高可用来说,常有三种方式,一是主备,二是sentinel,三是cluster(主备加分片)
本篇主要讲述cluster的方式,详解cluster模式的集群分片与一致性机制。
cluster的分片模式
redis-cluster的分片主要通过槽点(slot)进行划分的,redis自身为k-v数据库,那么对于key进行crc16%max(slot),即可得到这个key的槽点值,通过这个槽点值即可找到对应的master节点,进行写入。
对于已经建好cluster的集群来说,输入 cluster nodes命令可以看到类似如下结果
可以看到,在自身ip:pord之后的那一列为主从情况,只有master的那个节点的最后一列为a-b的格式,slave为connected。a-b指的就是这个master所负责的slot槽点范围。
cluster的集群一致性机制
常用的一致性协议有Raft,Paxos,Gossip等。redis-cluster模式使用等为gossip协议。
redis-cluster本身的分片设计就为去中心化的思想,因此在sentinel模式下采用raft协议的情况下,在cluster模式换用更适合p2p的经典协议gossip,每一对片与外界交互的目的也只是用来确认与更新集群成员身份、故障探测,因此gossip更适合cluster的设计思路。
redis-cluster集群通信所使用的端口为当前节点入口端口+10000,即若某个节点入口的端口号为6379,则此节点用来集群通信的端口为16379,所以集群间的通讯不会影响到节点的正常读写,而节点读写error时也不会影响集群间的gossip传播。
redis-cluster的源码分析
cluster相关的函数统一放在/src/cluster.c与/src/cluster.h下
clusterNode结构体
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
sds hostname; /* The known hostname for this node */
int port; /* Latest known clients port (TLS or plain). */
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
mstime_t的属性大多都是为了gossip超时或进行投票所使用的标志。在cluster模式下,标志一个节点的唯一属性为char name[CLUSTER_NAMELEN],统一为十六进制字符串,如图
中的第一列即为node-name,若拷贝配置时未清空nodes.conf,则会出现node冲突的情况,errorlog将会持续刷新此错误。
- 集群相关的函数
新增节点
/* Add a node to the nodes hash table */
void clusterAddNode(clusterNode *node) {
int retval;
retval = dictAdd(server.cluster->nodes,
sdsnewlen(node->name,CLUSTER_NAMELEN), node);
serverAssert(retval == DICT_OK);
}
可以看出,节点的信息通过dict进行保存,key为nodeName,value指向clusterNode,这也是为何标志一个节点的唯一属性为char name[CLUSTER_NAMELEN]的原因了
新增master
/* Set the specified node 'n' as master for this node.
* If this node is currently a master, it is turned into a slave. */
void clusterSetMaster(clusterNode *n) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);
if (nodeIsMaster(myself)) {
myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
myself->flags |= CLUSTER_NODE_SLAVE;
clusterCloseAllSlots();
} else {
if (myself->slaveof)
clusterNodeRemoveSlave(myself->slaveof,myself);
}
myself->slaveof = n;
//默认自己为自己的slave
clusterNodeAddSlave(n,myself);
//通知集群,由于gossip协议为八卦的性质,则通知自己后自己即通知其余节点,无需再多做其余操作
replicationSetMaster(n->ip, n->port);
resetManualFailover();
}
新增slave
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
int j;
/* If it's already a slave, don't add it again. */
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] == slave) return C_ERR;
master->slaves = zrealloc(master->slaves,
sizeof(clusterNode*)*(master->numslaves+1));
master->slaves[master->numslaves] = slave;
master->numslaves++;
master->flags |= CLUSTER_NODE_MIGRATE_TO;
return C_OK;
}
新增slave时需要指定slave的master,可以看出,对于一个master,cluster模式并未限制一个master对应的slave个数,理论上一到两个slave就够了
- slot相关的函数
为master赋予slot
/* Add the specified slot to the list of slots that node 'n' will
* serve. Return C_OK if the operation ended with success.
* If the slot is already assigned to another instance this is considered
* an error and C_ERR is returned. */
int clusterAddSlot(clusterNode *n, int slot) {
//若此slot已经被分配,则直接报错
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);
server.cluster->slots[slot] = n;
return C_OK;
}
/* Set the slot bit and return the old value. */
int clusterNodeSetSlotBit(clusterNode *n, int slot) {
int old = bitmapTestBit(n->slots,slot);
//通过设置位的值来极快的确定slot是否存在
bitmapSetBit(n->slots,slot);
if (!old) {
n->numslots++;
/* When a master gets its first slot, even if it has no slaves,
* it gets flagged with MIGRATE_TO, that is, the master is a valid
* target for replicas migration, if and only if at least one of
* the other masters has slaves right now.
*
* Normally masters are valid targets of replica migration if:
* 1. The used to have slaves (but no longer have).
* 2. They are slaves failing over a master that used to have slaves.
*
* However new masters with slots assigned are considered valid
* migration targets if the rest of the cluster is not a slave-less.
*
* See https://github.com/redis/redis/issues/3043 for more info. */
if (n->numslots == 1 && clusterMastersHaveSlaves())
n->flags |= CLUSTER_NODE_MIGRATE_TO;
}
return old;
}
/* Set the bit at position 'pos' in a bitmap. */
void bitmapSetBit(unsigned char *bitmap, int pos) {
//slot的max值为16383,则bitmap数组的最大长度为2048,即通过位的方式,将16384的长度缩小到了2048,极大的提升了寻找效率
//bitmap的方式也在布隆过滤器等大量使用,通过位运算极大的减小了时间复杂度与空间复杂度
off_t byte = pos/8;
int bit = pos&7;
bitmap[byte] |= 1<<bit;
}
移除slot
/* Delete the specified slot marking it as unassigned.
* Returns C_OK if the slot was assigned, otherwise if the slot was
* already unassigned C_ERR is returned. */
int clusterDelSlot(int slot) {
clusterNode *n = server.cluster->slots[slot];
if (!n) return C_ERR;
/* Cleanup the channels in master/replica as part of slot deletion. */
list *nodes_for_slot = clusterGetNodesServingMySlots(n);
listNode *ln = listSearchKey(nodes_for_slot, myself);
if (ln != NULL) {
removeChannelsInSlot(slot);
}
listRelease(nodes_for_slot);
serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
server.cluster->slots[slot] = NULL;
return C_OK;
}
- gossip定时轮训相关的函数
节点定时通过gossip的方式进行集群状态同步
cron函数(gossip相关核心函数)
/* This is executed 10 times every second */
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;
//迭代器,每100ms自增一次,通过此迭代器控制每10次进行一次节点间的pingpong通讯,即节点pingpong实际上为1s一次
iteration++; /* Number of times this function was called so far. */
//修复hostname,防止dns变更后集群丢失节点
clusterUpdateMyselfHostname();
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
//设置超时时间,虽允许自定义,但代码控制最低时间为1000ms
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
/* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */
server.stat_cluster_links_memory = 0;
/* Run through some of the operations we want to do on each cluster node. */
//遍历此节点维护的所有节点信息
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
/* The sequence goes:
* 1. We try to shrink link buffers if possible.
* 2. We free the links whose buffers are still oversized after possible shrinking.
* 3. We update the latest memory usage of cluster links.
* 4. We immediately attempt reconnecting after freeing links.
*/
clusterNodeCronResizeBuffers(node);
clusterNodeCronFreeLinkOnBufferLimitReached(node);
clusterNodeCronUpdateClusterLinksMemUsage(node);
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
//对于自身或者无法连接的节点进行剔除,仅保留可达的节点,clusterNodeCronHandleReconnect函数中包含真正创建连接进行pingpong的可达性测试
if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
}
dictReleaseIterator(di);
/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
if (!(iteration % 10)) {
int j;
/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
//虽然在上边的di中取得了所有可达的节点,但为了保证效率,只随机取出最大五个节点进行一致性同步或投票
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
//对于ping已经断开或者最近已经ping过的节点直接跳过,但依旧占用此轮gossip节点名额
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
//对于ping进行pong回应
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
//通过上边的节点交互,在这里遍历所有节点,检查是否需要将某个节点标记为下线
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue;
/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);
/* A master is orphaned if it is serving a non-zero number of
* slots, have no working slaves, but used to have at least one
* slave, or failed over a master that used to have slaves. */
if (okslaves == 0 && node->numslots > 0 &&
node->flags & CLUSTER_NODE_MIGRATE_TO)
{
orphaned_masters++;
}
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}
/* If we are not receiving any data for more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
mstime_t ping_delay = now - node->ping_sent;
mstime_t data_delay = now - node->data_received;
//判断此节点连接是否可能有问题,若有问题则直接释放掉,在下个cron周期会接着尝试重连
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */
/* and we are waiting for the pong more than timeout/2 */
ping_delay > server.cluster_node_timeout/2 &&
/* and in such interval we are not seeing any traffic at all. */
data_delay > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
freeClusterLink(node->link);
}
/* If we have currently no active ping in this instance, and the
* received PONG is older than half the cluster timeout, send
* a new ping now, to ensure all the nodes are pinged without
* a too big delay. */
//如果此节点中当前没有存活的ping,同时收到的pong超过集群超时的一半,则发送一个新的ping,保证所有节点都在ping时没有太大的延迟。
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
/* If we are a master and one of the slaves requested a manual
* failover, ping it continuously. */
//对于主节点来说,如果从发起了故障转换,则在此轮投票完成前保持连接,知道集群故障转移完成
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
/* Check only if we have an active ping for this instance. */
if (node->ping_sent == 0) continue;
/* Check if this node looks unreachable.
* Note that if we already received the PONG, then node->ping_sent
* is zero, so can't reach this code at all, so we don't risk of
* checking for a PONG delay if we didn't sent the PING.
*
* We also consider every incoming data as proof of liveness, since
* our cluster bus link is also used for data: under heavy data
* load pong delays are possible. */
mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
data_delay;
//计算delay的时常,若超时则判断此主故障或下线(PFAIL为疑似下线,在疑似的过程中需要gossip到其他节点进行投票确认)
if (node_delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);
/* If we are a slave node but the replication is still turned off,
* enable it if we know the address of our master and it appears to
* be up. */
//对于未进行主从同步的从,检查发现从的主已经上线,则在这里通过ping告诉它可以开始主从同步了
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
}
/* Abort a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
server.cluster_allow_replica_migration)
clusterHandleSlaveMigration(max_slaves);
}
if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();
}
可以看出,cluster模式在分片与主从的模式下,理解起来并不复杂,明白gossip原理即可理解cluster的故障主从切换。
同时有些小工具可供cluster模式进行使用。
- 通过key计算slot(位运算方式)ghroth:redis cluster的key固定slot方式(附slot的crc16打表法)
- 已知slot想固定key到此(字典)redis/Table at master · 260721735/redis
- redis-cluster新建集群(5以前使用redis-trib.rb,5或5以后采用redis-cli --cluster create 127.0.0.1:10000 127.0.0.1:10001 127.0.0.1:10002 127.0.0.1:10003 127.0.0.1:10004 127.0.0.1:10005 --cluster-replicas 1的方式初始化集群)