Redis主从复制

Redis 是如何基于状态机的设计思路来实现主从复制的?

(1) Redis主从复制

从原理上来说,Redis 的主从复制主要包括了全量复制、增量复制和长连接同步三种情况。

  1. 全量复制传输 RDB 文件;
  2. 增量复制传输主从断连期间的命令;
  3. 长连接同步则是把主节点正常收到的请求传输给从节点。

(2) 为什么需要主从复制

为了提高服务的分区容错性,一般都会通过冗余来提高分区容错性。
主从复制技术类似于冗余,是提高分区容错性的一种办法。

在使用 Redis 或 MySQL 数据库时,经常会使用主从复制来实现主从节点间的数据同步,以此提升服务的高可用性。

(3) Redis主从复制原理

  1. 从库设置主库
  2. 主从建立连接
  3. 主从握手并判断复制类型
  4. 复制类型判断与执行

(3.1) 从库设置主库

主要是获得了主库的 IP 和端口号。
可以用三种方式来设置。
方式一:在实例 A 上执行 replicaof masterip masterport 的主从复制命令,指明实例 B 的 IP(masterip)和端口号(masterport)。
方式二:在实例 A 的配置文件中设置 replicaof masterip masterport,实例 A 可以通过解析文件获得主库 IP 和端口号。
方式三:在实例 A 启动时,设置启动参数–replicaof [masterip] [masterport]。实例 A 解析启动参数,就能获得主库的 IP 和端口号。

(3.2) 主从建立连接

从库获得了主库的IP和端口号,就会尝试和主库建立TCP网络连接,并且会在建立好的网络连接上,监听是否有主库发送的命令。

(3.3) 主从握手

当从库和主库建立好连接之后,从库就开始和主库进行握手。
简单来说,握手过程就是主从库间相互发送PING-PONG消息,同时从库根据配置信息向主库进行验证。
从库把自己的 IP、端口号,以及对无盘复制和 PSYNC 2 协议的支持情况发给主库。

(3.4) 复制类型判断与执行

等到主从库之间的握手完成后,从库就会给主库发送 PSYNC 命令。主库会根据从库发送的命令参数作出相应的三种回复,分别是执行全量复制、执行增量复制、发生错误。
最后,从库在收到上述回复后,就会根据回复的复制类型,开始执行具体的复制操作。


(4) 主从复制源码解读

主从复制中的状态机具体对应的是什么呢?

// file: src/server.h
/*
 */
struct redisServer {
 // ...
 /* 复制相关 */
 char *masterauth; // 用于和主库进行验证的密码
 char *masterhost; // 主库主机名
 int masterport; // 主库端口号
 int repl_timeout; //
 client *master; // 从库上用来和主库连接的客户端
 client *cached_master; // 从库上缓存的主库信息
 int repl_state; // 从库的复制状态
 off_t repl_transfer_size; 
 off_t repl_transfer_read; 
 off_t repl_transfer_last_fsync_off; 
 connection *repl_transfer_s; 
 int repl_transfer_fd; 
 char *repl_transfer_tmpfile; 
 time_t repl_transfer_lastio; 
 int repl_serve_stale_data; 
 int repl_slave_ro; 
 int repl_slave_ignore_maxmemory; 
 time_t repl_down_since; 
 int repl_disable_tcp_nodelay; 
 int slave_priority; 
 int slave_announce_port; 
 char *slave_announce_ip; 
 // ...
}

(4.1) 从库设置主库

// file: src/server.c
void initServerConfig(void) {
 // 初始化复制状态 默认为没有
 server.repl_state = REPL_STATE_NONE;
 
}

实例执行了 replicaof masterip masterport 命令

// file: src/replication.c 
/*
 */
void replicaofCommand(client *c) {
 /* SLAVEOF is not allowed in cluster mode as replication is automatically
 * configured using the current address of the master node. */
 if (server.cluster_enabled) {
 addReplyError(c,"REPLICAOF not allowed in cluster mode.");
 return;
 }
 /* The special host/port combination "NO" "ONE" turns the instance
 * into a master. Otherwise the new master address is set. */
 if (!strcasecmp(c->argv[1]->ptr,"no") &&
 !strcasecmp(c->argv[2]->ptr,"one")) {
 if (server.masterhost) {
 replicationUnsetMaster();
 sds client = catClientInfoString(sdsempty(),c);
 serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
 client);
 sdsfree(client);
 }
 } else {
 long port;
 if (c->flags & CLIENT_SLAVE)
 {
 /* If a client is already a replica they cannot run this command,
 * because it involves flushing all replicas (including this
 * client) */
 addReplyError(c, "Command is not valid when client is a replica.");
 return;
 }
 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
 return;
 /* Check if we are already attached to the specified slave */
 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
 && server.masterport == port) {
 serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
 "with the master we are already connected "
 "with. No operation performed.");
 addReplySds(c,sdsnew("+OK Already connected to specified "
 "master\r\n"));
 return;
 }
 /* There was no previous master or the user specified a different one,
 * we can continue. */
 replicationSetMaster(c->argv[1]->ptr, port);
 sds client = catClientInfoString(sdsempty(),c);
 serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
 server.masterhost, server.masterport, client);
 sdsfree(client);
 }
 addReply(c,shared.ok);
}
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
 int was_master = server.masterhost == NULL;
 sdsfree(server.masterhost);
 server.masterhost = sdsnew(ip);
 server.masterport = port;
 if (server.master) {
 freeClient(server.master);
 }
 disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
 /* Update oom_score_adj */
 setOOMScoreAdj(-1);
 /* Force our slaves to resync with us as well. They may hopefully be able
 * to partially resync with us, but we can notify the replid change. */
 disconnectSlaves();
 cancelReplicationHandshake();
 /* Before destroying our master state, create a cached master using
 * our own parameters, to later PSYNC with the new master. */
 if (was_master) {
 replicationDiscardCachedMaster();
 replicationCacheMasterUsingMyself();
 }
 /* Fire the role change modules event. */
 moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
 REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
 NULL);
 /* Fire the master link modules event. */
 if (server.repl_state == REPL_STATE_CONNECTED)
 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
 REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
 NULL);
 server.repl_state = REPL_STATE_CONNECT;
}

replicationSetMaster 函数除了会记录主库的 IP、端口号之外,还会把从库实例的状态机设置为 REPL_STATE_CONNECT。
此时,主从复制的状态机会从 REPL_STATE_NONE 变迁为 REPL_STATE_CONNECT

(4.2) 主从建立连接

从库是何时开始和主库建立网络连接的呢?

replicationCron() 任务。这个任务的执行频率是每 1000ms 执行一次

replicationCron() 任务的函数实现逻辑是在 server.c 中,在该任务中,一个重要的判断就是,检查从库的复制状态机状态。如果状态机状态是 REPL_STATE_CONNECT,那么从库就开始和主库建立连接。连接的建立是通过调用 connectWithMaster() 函数来完成的。

/*
 *
 * @param *eventLoop
 * @param id
 * @param *clientData
 */
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 /* Replication cron function -- used to reconnect to master,
 * detect transfer failures, start background RDB transfers and so forth. */
 run_with_period(1000) replicationCron();
}
// file: src/replication.c
/* --------------------------- REPLICATION CRON ---------------------------- */
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
 static long long replication_cron_loops = 0;
 /* Non blocking connection timeout? */
 if (server.masterhost &&
 (server.repl_state == REPL_STATE_CONNECTING ||
 slaveIsInHandshakeState()) &&
 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
 {
 serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
 cancelReplicationHandshake();
 }
 /* Bulk transfer I/O timeout? */
 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
 {
 serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
 cancelReplicationHandshake();
 }
 /* Timed out master when we are an already connected slave? */
 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
 (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
 {
 serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
 freeClient(server.master);
 }
 /* Check if we should connect to a MASTER */
 if (server.repl_state == REPL_STATE_CONNECT) {
 serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
 server.masterhost, server.masterport);
 if (connectWithMaster() == C_OK) {
 serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
 }
 }
 /* Send ACK to master from time to time.
 * Note that we do not send periodic acks to masters that don't
 * support PSYNC and replication offsets. */
 if (server.masterhost && server.master &&
 !(server.master->flags & CLIENT_PRE_PSYNC))
 replicationSendAck();
 /* If we have attached slaves, PING them from time to time.
 * So slaves can implement an explicit timeout to masters, and will
 * be able to detect a link disconnection even if the TCP connection
 * will not actually go down. */
 listIter li;
 listNode *ln;
 robj *ping_argv[1];
 /* First, send PING according to ping_slave_period. */
 if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
 listLength(server.slaves))
 {
 /* Note that we don't send the PING if the clients are paused during
 * a Redis Cluster manual failover: the PING we send will otherwise
 * alter the replication offsets of master and slave, and will no longer
 * match the one stored into 'mf_master_offset' state. */
 int manual_failover_in_progress =
 server.cluster_enabled &&
 server.cluster->mf_end &&
 clientsArePaused();
 if (!manual_failover_in_progress) {
 ping_argv[0] = createStringObject("PING",4);
 replicationFeedSlaves(server.slaves, server.slaveseldb,
 ping_argv, 1);
 decrRefCount(ping_argv[0]);
 }
 }
 /* Second, send a newline to all the slaves in pre-synchronization
 * stage, that is, slaves waiting for the master to create the RDB file.
 *
 * Also send the a newline to all the chained slaves we have, if we lost
 * connection from our master, to keep the slaves aware that their
 * master is online. This is needed since sub-slaves only receive proxied
 * data from top-level masters, so there is no explicit pinging in order
 * to avoid altering the replication offsets. This special out of band
 * pings (newlines) can be sent, they will have no effect in the offset.
 *
 * The newline will be ignored by the slave but will refresh the
 * last interaction timer preventing a timeout. In this case we ignore the
 * ping period and refresh the connection once per second since certain
 * timeouts are set at a few seconds (example: PSYNC response). */
 listRewind(server.slaves,&li);
 while((ln = listNext(&li))) {
 client *slave = ln->value;
 int is_presync =
 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
 server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
 if (is_presync) {
 connWrite(slave->conn, "\n", 1);
 }
 }
 /* Disconnect timedout slaves. */
 if (listLength(server.slaves)) {
 listIter li;
 listNode *ln;
 listRewind(server.slaves,&li);
 while((ln = listNext(&li))) {
 client *slave = ln->value;
 if (slave->replstate == SLAVE_STATE_ONLINE) {
 if (slave->flags & CLIENT_PRE_PSYNC)
 continue;
 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
 serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
 replicationGetSlaveName(slave));
 freeClient(slave);
 continue;
 }
 }
 /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
 * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
 * from terminating. */
 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
 if (slave->repl_last_partial_write != 0 &&
 (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
 {
 serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
 replicationGetSlaveName(slave));
 freeClient(slave);
 continue;
 }
 }
 }
 }
 /* If this is a master without attached slaves and there is a replication
 * backlog active, in order to reclaim memory we can free it after some
 * (configured) time. Note that this cannot be done for slaves: slaves
 * without sub-slaves attached should still accumulate data into the
 * backlog, in order to reply to PSYNC queries if they are turned into
 * masters after a failover. */
 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
 server.repl_backlog && server.masterhost == NULL)
 {
 time_t idle = server.unixtime - server.repl_no_slaves_since;
 if (idle > server.repl_backlog_time_limit) {
 /* When we free the backlog, we always use a new
 * replication ID and clear the ID2. This is needed
 * because when there is no backlog, the master_repl_offset
 * is not updated, but we would still retain our replication
 * ID, leading to the following problem:
 *
 * 1. We are a master instance.
 * 2. Our slave is promoted to master. It's repl-id-2 will
 * be the same as our repl-id.
 * 3. We, yet as master, receive some updates, that will not
 * increment the master_repl_offset.
 * 4. Later we are turned into a slave, connect to the new
 * master that will accept our PSYNC request by second
 * replication ID, but there will be data inconsistency
 * because we received writes. */
 changeReplicationId();
 clearReplicationId2();
 freeReplicationBacklog();
 serverLog(LL_NOTICE,
 "Replication backlog freed after %d seconds "
 "without connected replicas.",
 (int) server.repl_backlog_time_limit);
 }
 }
 /* If AOF is disabled and we no longer have attached slaves, we can
 * free our Replication Script Cache as there is no need to propagate
 * EVALSHA at all. */
 if (listLength(server.slaves) == 0 &&
 server.aof_state == AOF_OFF &&
 listLength(server.repl_scriptcache_fifo) != 0)
 {
 replicationScriptCacheFlush();
 }
 /* Start a BGSAVE good for replication if we have slaves in
 * WAIT_BGSAVE_START state.
 *
 * In case of diskless replication, we make sure to wait the specified
 * number of seconds (according to configuration) so that other slaves
 * have the time to arrive before we start streaming. */
 if (!hasActiveChildProcess()) {
 time_t idle, max_idle = 0;
 int slaves_waiting = 0;
 int mincapa = -1;
 listNode *ln;
 listIter li;
 listRewind(server.slaves,&li);
 while((ln = listNext(&li))) {
 client *slave = ln->value;
 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
 idle = server.unixtime - slave->lastinteraction;
 if (idle > max_idle) max_idle = idle;
 slaves_waiting++;
 mincapa = (mincapa == -1) ? slave->slave_capa :
 (mincapa & slave->slave_capa);
 }
 }
 if (slaves_waiting &&
 (!server.repl_diskless_sync ||
 max_idle > server.repl_diskless_sync_delay))
 {
 /* Start the BGSAVE. The called function may start a
 * BGSAVE with socket target or disk target depending on the
 * configuration and slaves capabilities. */
 startBgsaveForReplication(mincapa);
 }
 }
 /* Remove the RDB file used for replication if Redis is not running
 * with any persistence. */
 removeRDBUsedToSyncReplicas();
 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
 refreshGoodSlavesCount();
 replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
/*
 * 
 */
int connectWithMaster(void) {
 // 
 server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
 // 
 if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
 NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
 serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
 connGetLastError(server.repl_transfer_s));
 connClose(server.repl_transfer_s);
 server.repl_transfer_s = NULL;
 return C_ERR;
 }
 // 
 server.repl_transfer_lastio = server.unixtime;
 // 连接建立后,将状态机设置为REPL_STATE_CONNECTING
 server.repl_state = REPL_STATE_CONNECTING;
 return C_OK;
}

(4.3) 主从握手并判断复制类型

从库建立TCP连接后,从库实例其实并没有立即开始进行数据同步,而是会先和主库之间进行握手通信。

握手通信的目的,主要包括从库和主库进行验证,以及从库将自身的 IP 和端口号发给主库。

(4.4) 复制类型判断与执行

/* Try a partial resynchronization with the master if we are about to reconnect.
 * If there is no cached master structure, at least try to issue a
 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
 * command in order to obtain the master replid and the master replication
 * global offset.
 *
 * This function is designed to be called from syncWithMaster(), so the
 * following assumptions are made:
 *
 * 1) We pass the function an already connected socket "fd".
 * 2) This function does not close the file descriptor "fd". However in case
 * of successful partial resynchronization, the function will reuse
 * 'fd' as file descriptor of the server.master client structure.
 *
 * The function is split in two halves: if read_reply is 0, the function
 * writes the PSYNC command on the socket, and a new function call is
 * needed, with read_reply set to 1, in order to read the reply of the
 * command. This is useful in order to support non blocking operations, so
 * that we write, return into the event loop, and read when there are data.
 *
 * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
 * was a write error, or PSYNC_WAIT_REPLY to signal we need another call
 * with read_reply set to 1. However even when read_reply is set to 1
 * the function may return PSYNC_WAIT_REPLY again to signal there were
 * insufficient data to read to complete its work. We should re-enter
 * into the event loop and wait in such a case.
 *
 * The function returns:
 *
 * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue.
 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
 * In this case the master replid and global replication
 * offset is saved.
 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
 * the caller should fall back to SYNC.
 * PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
 * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
 * PSYNC_TRY_LATER: Master is currently in a transient error condition.
 *
 * Notable side effects:
 *
 * 1) As a side effect of the function call the function removes the readable
 * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
 * 2) server.master_initial_offset is set to the right value according
 * to the master reply. This will be used to populate the 'server.master'
 * structure replication offset.
 */
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5
/*
 */
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
 char *psync_replid;
 char psync_offset[32];
 sds reply;
 /* Writing half */
 if (!read_reply) {
 /* Initially set master_initial_offset to -1 to mark the current
 * master replid and offset as not valid. Later if we'll be able to do
 * a FULL resync using the PSYNC command we'll set the offset at the
 * right value, so that this information will be propagated to the
 * client structure representing the master into server.master. */
 server.master_initial_offset = -1;
 if (server.cached_master) {
 psync_replid = server.cached_master->replid;
 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
 serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
 } else {
 serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
 psync_replid = "?";
 memcpy(psync_offset,"-1",3);
 }
 /* Issue the PSYNC command */
 reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
 if (reply != NULL) {
 serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
 sdsfree(reply);
 connSetReadHandler(conn, NULL);
 return PSYNC_WRITE_ERROR;
 }
 return PSYNC_WAIT_REPLY;
 }
 /* Reading half */
 reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
 if (sdslen(reply) == 0) {
 /* The master may send empty newlines after it receives PSYNC
 * and before to reply, just to keep the connection alive. */
 sdsfree(reply);
 return PSYNC_WAIT_REPLY;
 }
 connSetReadHandler(conn, NULL);
 if (!strncmp(reply,"+FULLRESYNC",11)) {
 char *replid = NULL, *offset = NULL;
 /* FULL RESYNC, parse the reply in order to extract the replid
 * and the replication offset. */
 replid = strchr(reply,' ');
 if (replid) {
 replid++;
 offset = strchr(replid,' ');
 if (offset) offset++;
 }
 if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
 serverLog(LL_WARNING,
 "Master replied with wrong +FULLRESYNC syntax.");
 /* This is an unexpected condition, actually the +FULLRESYNC
 * reply means that the master supports PSYNC, but the reply
 * format seems wrong. To stay safe we blank the master
 * replid to make sure next PSYNCs will fail. */
 memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
 } else {
 memcpy(server.master_replid, replid, offset-replid-1);
 server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
 server.master_initial_offset = strtoll(offset,NULL,10);
 serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
 server.master_replid,
 server.master_initial_offset);
 }
 /* We are going to full resync, discard the cached master structure. */
 replicationDiscardCachedMaster();
 sdsfree(reply);
 return PSYNC_FULLRESYNC;
 }
 if (!strncmp(reply,"+CONTINUE",9)) {
 /* Partial resync was accepted. */
 serverLog(LL_NOTICE,
 "Successful partial resynchronization with master.");
 /* Check the new replication ID advertised by the master. If it
 * changed, we need to set the new ID as primary ID, and set or
 * secondary ID as the old master ID up to the current offset, so
 * that our sub-slaves will be able to PSYNC with us after a
 * disconnection. */
 char *start = reply+10;
 char *end = reply+9;
 while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
 if (end-start == CONFIG_RUN_ID_SIZE) {
 char new[CONFIG_RUN_ID_SIZE+1];
 memcpy(new,start,CONFIG_RUN_ID_SIZE);
 new[CONFIG_RUN_ID_SIZE] = '\0';
 if (strcmp(new,server.cached_master->replid)) {
 /* Master ID changed. */
 serverLog(LL_WARNING,"Master replication ID changed to %s",new);
 /* Set the old ID as our ID2, up to the current offset+1. */
 memcpy(server.replid2,server.cached_master->replid,
 sizeof(server.replid2));
 server.second_replid_offset = server.master_repl_offset+1;
 /* Update the cached master ID and our own primary ID to the
 * new one. */
 memcpy(server.replid,new,sizeof(server.replid));
 memcpy(server.cached_master->replid,new,sizeof(server.replid));
 /* Disconnect all the sub-slaves: they need to be notified. */
 disconnectSlaves();
 }
 }
 /* Setup the replication to continue. */
 sdsfree(reply);
 replicationResurrectCachedMaster(conn);
 /* If this instance was restarted and we read the metadata to
 * PSYNC from the persistence file, our replication backlog could
 * be still not initialized. Create it. */
 if (server.repl_backlog == NULL) createReplicationBacklog();
 return PSYNC_CONTINUE;
 }
 /* If we reach this point we received either an error (since the master does
 * not understand PSYNC or because it is in a special state and cannot
 * serve our request), or an unexpected reply from the master.
 *
 * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
 * return PSYNC_TRY_LATER if we believe this is a transient error. */
 if (!strncmp(reply,"-NOMASTERLINK",13) ||
 !strncmp(reply,"-LOADING",8))
 {
 serverLog(LL_NOTICE,
 "Master is currently unable to PSYNC "
 "but should be in the future: %s", reply);
 sdsfree(reply);
 return PSYNC_TRY_LATER;
 }
 if (strncmp(reply,"-ERR",4)) {
 /* If it's not an error, log the unexpected event. */
 serverLog(LL_WARNING,
 "Unexpected reply to PSYNC from master: %s", reply);
 } else {
 serverLog(LL_NOTICE,
 "Master does not support PSYNC or is in "
 "error state (reply: %s)", reply);
 }
 sdsfree(reply);
 replicationDiscardCachedMaster();
 return PSYNC_NOT_SUPPORTED;
}

(5) 参考资料

https://weikeqin.com/tags/redis/

Redis源码剖析与实战 学习笔记 Day21 主从复制:基于状态机的设计与实现 https://time.geekbang.org/col...

作者:wkq2786130原文地址:https://segmentfault.com/a/1190000043387571

%s 个评论

要回复文章请先登录注册