redis源码学习-cluster篇

对于redis高可用来说,常有三种方式,一是主备,二是sentinel,三是cluster(主备加分片)
本篇主要讲述cluster的方式,详解cluster模式的集群分片与一致性机制。

cluster的分片模式

redis-cluster的分片主要通过槽点(slot)进行划分的,redis自身为k-v数据库,那么对于key进行crc16%max(slot),即可得到这个key的槽点值,通过这个槽点值即可找到对应的master节点,进行写入。
对于已经建好cluster的集群来说,输入 cluster nodes命令可以看到类似如下结果

可以看到,在自身ip:pord之后的那一列为主从情况,只有master的那个节点的最后一列为a-b的格式,slave为connected。a-b指的就是这个master所负责的slot槽点范围。

cluster的集群一致性机制

常用的一致性协议有Raft,Paxos,Gossip等。redis-cluster模式使用等为gossip协议。
redis-cluster本身的分片设计就为去中心化的思想,因此在sentinel模式下采用raft协议的情况下,在cluster模式换用更适合p2p的经典协议gossip,每一对片与外界交互的目的也只是用来确认与更新集群成员身份、故障探测,因此gossip更适合cluster的设计思路。
redis-cluster集群通信所使用的端口为当前节点入口端口+10000,即若某个节点入口的端口号为6379,则此节点用来集群通信的端口为16379,所以集群间的通讯不会影响到节点的正常读写,而节点读写error时也不会影响集群间的gossip传播。

redis-cluster的源码分析

cluster相关的函数统一放在/src/cluster.c与/src/cluster.h下

clusterNode结构体

typedef struct clusterNode {
    mstime_t ctime; /* Node object creation time. */
    char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
    int flags;      /* CLUSTER_NODE_... */
    uint64_t configEpoch; /* Last configEpoch observed for this node */
    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
    uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
    int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
    int numslots;   /* Number of slots handled by this node */
    int numslaves;  /* Number of slave nodes, if this is a master */
    struct clusterNode **slaves; /* pointers to slave nodes */
    struct clusterNode *slaveof; /* pointer to the master node. Note that it
                                    may be NULL even if the node is a slave
                                    if we don't have the master node in our
                                    tables. */
    mstime_t ping_sent;      /* Unix time we sent latest ping */
    mstime_t pong_received;  /* Unix time we received the pong */
    mstime_t data_received;  /* Unix time we received any data */
    mstime_t fail_time;      /* Unix time when FAIL flag was set */
    mstime_t voted_time;     /* Last time we voted for a slave of this master */
    mstime_t repl_offset_time;  /* Unix time we received offset for this node */
    mstime_t orphaned_time;     /* Starting time of orphaned master condition */
    long long repl_offset;      /* Last known repl offset for this node. */
    char ip[NET_IP_STR_LEN];    /* Latest known IP address of this node */
    sds hostname;               /* The known hostname for this node */
    int port;                   /* Latest known clients port (TLS or plain). */
    int pport;                  /* Latest known clients plaintext port. Only used
                                   if the main clients port is for TLS. */
    int cport;                  /* Latest known cluster port of this node. */
    clusterLink *link;          /* TCP/IP link established toward this node */
    clusterLink *inbound_link;  /* TCP/IP link accepted from this node */
    list *fail_reports;         /* List of nodes signaling this as failing */
} clusterNode;

mstime_t的属性大多都是为了gossip超时或进行投票所使用的标志。在cluster模式下,标志一个节点的唯一属性为char name[CLUSTER_NAMELEN],统一为十六进制字符串,如图

中的第一列即为node-name,若拷贝配置时未清空nodes.conf,则会出现node冲突的情况,errorlog将会持续刷新此错误。

  1. 集群相关的函数

新增节点

/* Add a node to the nodes hash table */
void clusterAddNode(clusterNode *node) {
    int retval;

    retval = dictAdd(server.cluster->nodes,
            sdsnewlen(node->name,CLUSTER_NAMELEN), node);
    serverAssert(retval == DICT_OK);
}

可以看出,节点的信息通过dict进行保存,key为nodeName,value指向clusterNode,这也是为何标志一个节点的唯一属性为char name[CLUSTER_NAMELEN]的原因了

新增master

/* Set the specified node 'n' as master for this node.
 * If this node is currently a master, it is turned into a slave. */
void clusterSetMaster(clusterNode *n) {
    serverAssert(n != myself);
    serverAssert(myself->numslots == 0);

    if (nodeIsMaster(myself)) {
        myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
        myself->flags |= CLUSTER_NODE_SLAVE;
        clusterCloseAllSlots();
    } else {
        if (myself->slaveof)
            clusterNodeRemoveSlave(myself->slaveof,myself);
    }
    myself->slaveof = n;
    //默认自己为自己的slave
    clusterNodeAddSlave(n,myself);
    //通知集群,由于gossip协议为八卦的性质,则通知自己后自己即通知其余节点,无需再多做其余操作
    replicationSetMaster(n->ip, n->port);
    resetManualFailover();
}

新增slave

int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
    int j;

    /* If it's already a slave, don't add it again. */
    for (j = 0; j < master->numslaves; j++)
        if (master->slaves[j] == slave) return C_ERR;
    master->slaves = zrealloc(master->slaves,
        sizeof(clusterNode*)*(master->numslaves+1));
    master->slaves[master->numslaves] = slave;
    master->numslaves++;
    master->flags |= CLUSTER_NODE_MIGRATE_TO;
    return C_OK;
}

新增slave时需要指定slave的master,可以看出,对于一个master,cluster模式并未限制一个master对应的slave个数,理论上一到两个slave就够了

  1. slot相关的函数

为master赋予slot

/* Add the specified slot to the list of slots that node 'n' will
 * serve. Return C_OK if the operation ended with success.
 * If the slot is already assigned to another instance this is considered
 * an error and C_ERR is returned. */
int clusterAddSlot(clusterNode *n, int slot) {
//若此slot已经被分配,则直接报错
    if (server.cluster->slots[slot]) return C_ERR;
    clusterNodeSetSlotBit(n,slot);
    server.cluster->slots[slot] = n;
    return C_OK;
}
/* Set the slot bit and return the old value. */
int clusterNodeSetSlotBit(clusterNode *n, int slot) {
    int old = bitmapTestBit(n->slots,slot);
//通过设置位的值来极快的确定slot是否存在
    bitmapSetBit(n->slots,slot);
    if (!old) {
        n->numslots++;
        /* When a master gets its first slot, even if it has no slaves,
         * it gets flagged with MIGRATE_TO, that is, the master is a valid
         * target for replicas migration, if and only if at least one of
         * the other masters has slaves right now.
         *
         * Normally masters are valid targets of replica migration if:
         * 1. The used to have slaves (but no longer have).
         * 2. They are slaves failing over a master that used to have slaves.
         *
         * However new masters with slots assigned are considered valid
         * migration targets if the rest of the cluster is not a slave-less.
         *
         * See https://github.com/redis/redis/issues/3043 for more info. */
        if (n->numslots == 1 && clusterMastersHaveSlaves())
            n->flags |= CLUSTER_NODE_MIGRATE_TO;
    }
    return old;
}
/* Set the bit at position 'pos' in a bitmap. */
void bitmapSetBit(unsigned char *bitmap, int pos) {
//slot的max值为16383,则bitmap数组的最大长度为2048,即通过位的方式,将16384的长度缩小到了2048,极大的提升了寻找效率
//bitmap的方式也在布隆过滤器等大量使用,通过位运算极大的减小了时间复杂度与空间复杂度
    off_t byte = pos/8;
    int bit = pos&7;
    bitmap[byte] |= 1<<bit;
}

移除slot

/* Delete the specified slot marking it as unassigned.
 * Returns C_OK if the slot was assigned, otherwise if the slot was
 * already unassigned C_ERR is returned. */
int clusterDelSlot(int slot) {
    clusterNode *n = server.cluster->slots[slot];

    if (!n) return C_ERR;

    /* Cleanup the channels in master/replica as part of slot deletion. */
    list *nodes_for_slot = clusterGetNodesServingMySlots(n);
    listNode *ln = listSearchKey(nodes_for_slot, myself);
    if (ln != NULL) {
        removeChannelsInSlot(slot);
    }
    listRelease(nodes_for_slot);
    serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
    server.cluster->slots[slot] = NULL;
    return C_OK;
}
  1. gossip定时轮训相关的函数
    节点定时通过gossip的方式进行集群状态同步

cron函数(gossip相关核心函数)

/* This is executed 10 times every second */
void clusterCron(void) {
    dictIterator *di;
    dictEntry *de;
    int update_state = 0;
    int orphaned_masters; /* How many masters there are without ok slaves. */
    int max_slaves; /* Max number of ok slaves for a single master. */
    int this_slaves; /* Number of ok slaves for our master (if we are slave). */
    mstime_t min_pong = 0, now = mstime();
    clusterNode *min_pong_node = NULL;
    static unsigned long long iteration = 0;
    mstime_t handshake_timeout;
    //迭代器,每100ms自增一次,通过此迭代器控制每10次进行一次节点间的pingpong通讯,即节点pingpong实际上为1s一次
    iteration++; /* Number of times this function was called so far. */
    //修复hostname,防止dns变更后集群丢失节点
    clusterUpdateMyselfHostname();

    /* The handshake timeout is the time after which a handshake node that was
     * not turned into a normal node is removed from the nodes. Usually it is
     * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
     * the value of 1 second. */
    //设置超时时间,虽允许自定义,但代码控制最低时间为1000ms
    handshake_timeout = server.cluster_node_timeout;
    if (handshake_timeout < 1000) handshake_timeout = 1000;

    /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
    server.cluster->stats_pfail_nodes = 0;
    /* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */
    server.stat_cluster_links_memory = 0;
    /* Run through some of the operations we want to do on each cluster node. */
    //遍历此节点维护的所有节点信息
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        /* The sequence goes:
         * 1. We try to shrink link buffers if possible.
         * 2. We free the links whose buffers are still oversized after possible shrinking.
         * 3. We update the latest memory usage of cluster links.
         * 4. We immediately attempt reconnecting after freeing links.
         */
        clusterNodeCronResizeBuffers(node);
        clusterNodeCronFreeLinkOnBufferLimitReached(node);
        clusterNodeCronUpdateClusterLinksMemUsage(node);
        /* The protocol is that function(s) below return non-zero if the node was
         * terminated.
         */
         //对于自身或者无法连接的节点进行剔除,仅保留可达的节点,clusterNodeCronHandleReconnect函数中包含真正创建连接进行pingpong的可达性测试
        if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
    }
    dictReleaseIterator(di); 

    /* Ping some random node 1 time every 10 iterations, so that we usually ping
     * one random node every second. */
    if (!(iteration % 10)) {
        int j;

        /* Check a few random nodes and ping the one with the oldest
         * pong_received time. */
        //虽然在上边的di中取得了所有可达的节点,但为了保证效率,只随机取出最大五个节点进行一致性同步或投票
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);

            /* Don't ping nodes disconnected or with a ping currently active. */
            //对于ping已经断开或者最近已经ping过的节点直接跳过,但依旧占用此轮gossip节点名额
            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
                continue;
            //对于ping进行pong回应
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
        if (min_pong_node) {
            serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }
    /* Iterate nodes to check if we need to flag something as failing.
     * This loop is also responsible to:
     * 1) Check if there are orphaned masters (masters without non failing
     *    slaves).
     * 2) Count the max number of non failing slaves for a single master.
     * 3) Count the number of slaves for our master, if we are a slave. */
    orphaned_masters = 0;
    max_slaves = 0;
    this_slaves = 0;
    di = dictGetSafeIterator(server.cluster->nodes);
    //通过上边的节点交互,在这里遍历所有节点,检查是否需要将某个节点标记为下线
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        now = mstime(); /* Use an updated time at every iteration. */

        if (node->flags &
            (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
                continue;

        /* Orphaned master check, useful only if the current instance
         * is a slave that may migrate to another master. */
        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
            int okslaves = clusterCountNonFailingSlaves(node);

            /* A master is orphaned if it is serving a non-zero number of
             * slots, have no working slaves, but used to have at least one
             * slave, or failed over a master that used to have slaves. */
            if (okslaves == 0 && node->numslots > 0 &&
                node->flags & CLUSTER_NODE_MIGRATE_TO)
            {
                orphaned_masters++;
            }
            if (okslaves > max_slaves) max_slaves = okslaves;
            if (nodeIsSlave(myself) && myself->slaveof == node)
                this_slaves = okslaves;
        }

        /* If we are not receiving any data for more than half the cluster
         * timeout, reconnect the link: maybe there is a connection
         * issue even if the node is alive. */
        mstime_t ping_delay = now - node->ping_sent;
        mstime_t data_delay = now - node->data_received;
        //判断此节点连接是否可能有问题,若有问题则直接释放掉,在下个cron周期会接着尝试重连
        if (node->link && /* is connected */
            now - node->link->ctime >
            server.cluster_node_timeout && /* was not already reconnected */
            node->ping_sent && /* we already sent a ping */
            /* and we are waiting for the pong more than timeout/2 */
            ping_delay > server.cluster_node_timeout/2 &&
            /* and in such interval we are not seeing any traffic at all. */
            data_delay > server.cluster_node_timeout/2)
        {
            /* Disconnect the link, it will be reconnected automatically. */
            freeClusterLink(node->link);
        }

        /* If we have currently no active ping in this instance, and the
         * received PONG is older than half the cluster timeout, send
         * a new ping now, to ensure all the nodes are pinged without
         * a too big delay. */
        //如果此节点中当前没有存活的ping,同时收到的pong超过集群超时的一半,则发送一个新的ping,保证所有节点都在ping时没有太大的延迟。
        if (node->link &&
            node->ping_sent == 0 &&
            (now - node->pong_received) > server.cluster_node_timeout/2)
        {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        /* If we are a master and one of the slaves requested a manual
         * failover, ping it continuously. */
        //对于主节点来说,如果从发起了故障转换,则在此轮投票完成前保持连接,知道集群故障转移完成
        if (server.cluster->mf_end &&
            nodeIsMaster(myself) &&
            server.cluster->mf_slave == node &&
            node->link)
        {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        /* Check only if we have an active ping for this instance. */
        if (node->ping_sent == 0) continue;

        /* Check if this node looks unreachable.
         * Note that if we already received the PONG, then node->ping_sent
         * is zero, so can't reach this code at all, so we don't risk of
         * checking for a PONG delay if we didn't sent the PING.
         *
         * We also consider every incoming data as proof of liveness, since
         * our cluster bus link is also used for data: under heavy data
         * load pong delays are possible. */
        mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
                                                          data_delay;
        //计算delay的时常,若超时则判断此主故障或下线(PFAIL为疑似下线,在疑似的过程中需要gossip到其他节点进行投票确认)
        if (node_delay > server.cluster_node_timeout) {
            /* Timeout reached. Set the node as possibly failing if it is
             * not already in this state. */
            if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
                serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
                    node->name);
                node->flags |= CLUSTER_NODE_PFAIL;
                update_state = 1;
            }
        }
    }
    dictReleaseIterator(di);

    /* If we are a slave node but the replication is still turned off,
     * enable it if we know the address of our master and it appears to
     * be up. */
    //对于未进行主从同步的从,检查发现从的主已经上线,则在这里通过ping告诉它可以开始主从同步了
    if (nodeIsSlave(myself) &&
        server.masterhost == NULL &&
        myself->slaveof &&
        nodeHasAddr(myself->slaveof))
    {
        replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
    }

    /* Abort a manual failover if the timeout is reached. */
    manualFailoverCheckTimeout();

    if (nodeIsSlave(myself)) {
        clusterHandleManualFailover();
        if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
            clusterHandleSlaveFailover();
        /* If there are orphaned slaves, and we are a slave among the masters
         * with the max number of non-failing slaves, consider migrating to
         * the orphaned masters. Note that it does not make sense to try
         * a migration if there is no master with at least *two* working
         * slaves. */
        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
		server.cluster_allow_replica_migration)
            clusterHandleSlaveMigration(max_slaves);
    }

    if (update_state || server.cluster->state == CLUSTER_FAIL)
        clusterUpdateState();
}

可以看出,cluster模式在分片与主从的模式下,理解起来并不复杂,明白gossip原理即可理解cluster的故障主从切换。
同时有些小工具可供cluster模式进行使用。

  1. 通过key计算slot(位运算方式)ghroth:redis cluster的key固定slot方式(附slot的crc16打表法)
  2. 已知slot想固定key到此(字典)redis/Table at master · 260721735/redis
  3. redis-cluster新建集群(5以前使用redis-trib.rb,5或5以后采用redis-cli --cluster create 127.0.0.1:10000 127.0.0.1:10001 127.0.0.1:10002 127.0.0.1:10003 127.0.0.1:10004 127.0.0.1:10005 --cluster-replicas 1的方式初始化集群)