[PATCH 1/2] replication: do not delete relay on applier disconnect

Konstantin Belyavskiy k.belyavskiy at tarantool.org
Mon May 21 20:07:17 MSK 2018


This is a part of more complex task aiming to improve logging.
Do not destroy relay since it stores last error and it can be
useful for diagnostic reason.
Now relay is created with replica and always exists. So also
remove several NULL checks.
Add relay_state { OFF, FOLLOW and STOPPED } to track replica
presence, once connected it either FOLLOW or STOPPED until
master is reset.
Updated with @kostja proposal.

Used for #3365.
---
 src/box/box.cc         |   5 +-
 src/box/lua/info.c     |   2 +-
 src/box/relay.cc       | 128 +++++++++++++++++++++++++++++++------------------
 src/box/relay.h        |  33 +++++++++++--
 src/box/replication.cc |  35 +++++++-------
 src/box/replication.h  |  13 +----
 6 files changed, 132 insertions(+), 84 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index eabff1b63..018802f1d 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1401,7 +1401,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	 * Final stage: feed replica with WALs in range
 	 * (start_vclock, stop_vclock).
 	 */
-	relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock);
+	relay_final_join(replica, io->fd, header->sync,
+			 &start_vclock, &stop_vclock);
 	say_info("final data sent.");
 
 	/* Send end of WAL stream marker */
@@ -1493,7 +1494,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	 * a stall in updates (in this case replica may hang
 	 * indefinitely).
 	 */
-	relay_subscribe(io->fd, header->sync, replica, &replica_clock,
+	relay_subscribe(replica, io->fd, header->sync, &replica_clock,
 			replica_version_id);
 }
 
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 8e8fd9d97..9dbc3f92c 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -145,7 +145,7 @@ lbox_pushreplica(lua_State *L, struct replica *replica)
 		lua_settable(L, -3);
 	}
 
-	if (relay != NULL) {
+	if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
 		lua_pushstring(L, "downstream");
 		lbox_pushrelay(L, relay);
 		lua_settable(L, -3);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index d2ceaf110..6470946ae 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -100,8 +100,6 @@ struct relay {
 	struct replica *replica;
 	/** WAL event watcher. */
 	struct wal_watcher wal_watcher;
-	/** Set before exiting the relay loop. */
-	bool exiting;
 	/** Relay reader cond. */
 	struct fiber_cond reader_cond;
 	/** Relay diagnostics. */
@@ -131,6 +129,8 @@ struct relay {
 	struct stailq pending_gc;
 	/** Time when last row was sent to peer. */
 	double last_row_tm;
+	/** Relay sync state. */
+	enum relay_state state;
 
 	struct {
 		/* Align to prevent false-sharing with tx thread */
@@ -140,6 +140,12 @@ struct relay {
 	} tx;
 };
 
+enum relay_state
+relay_get_state(const struct relay *relay)
+{
+	return relay->state;
+}
+
 const struct vclock *
 relay_vclock(const struct relay *relay)
 {
@@ -153,32 +159,63 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);
 static void
 relay_send_row(struct xstream *stream, struct xrow_header *row);
 
+struct relay *
+relay_new(struct replica *replica)
+{
+	struct relay *relay = (struct relay *) calloc(1, sizeof(struct relay));
+	if (relay == NULL) {
+		diag_set(OutOfMemory, sizeof(struct relay), "malloc",
+			  "struct relay");
+		return NULL;
+	}
+	relay->replica = replica;
+	fiber_cond_create(&relay->reader_cond);
+	diag_create(&relay->diag);
+	stailq_create(&relay->pending_gc);
+	relay->state = RELAY_OFF;
+	return relay;
+}
+
 static void
-relay_create(struct relay *relay, int fd, uint64_t sync,
+relay_start(struct relay *relay, int fd, uint64_t sync,
 	     void (*stream_write)(struct xstream *, struct xrow_header *))
 {
-	memset(relay, 0, sizeof(*relay));
 	xstream_create(&relay->stream, stream_write);
+	/*
+	 * Clear the diagnostics at start, in case it has the old
+	 * error message which we keep around to display in
+	 * box.info.replication.
+	 */
+	diag_clear(&relay->diag);
 	coio_create(&relay->io, fd);
 	relay->sync = sync;
-	fiber_cond_create(&relay->reader_cond);
-	diag_create(&relay->diag);
-	stailq_create(&relay->pending_gc);
+	relay->state = RELAY_FOLLOW;
 }
 
 static void
-relay_destroy(struct relay *relay)
+relay_stop(struct relay *relay)
 {
 	struct relay_gc_msg *gc_msg, *next_gc_msg;
 	stailq_foreach_entry_safe(gc_msg, next_gc_msg,
 				  &relay->pending_gc, in_pending) {
 		free(gc_msg);
 	}
+	stailq_create(&relay->pending_gc);
 	if (relay->r != NULL)
 		recovery_delete(relay->r);
+	relay->r = NULL;
+	relay->state = RELAY_STOPPED;
+}
+
+void
+relay_delete(struct relay *relay)
+{
+	if (relay->state == RELAY_FOLLOW)
+		relay_stop(relay);
 	fiber_cond_destroy(&relay->reader_cond);
 	diag_destroy(&relay->diag);
 	TRASH(relay);
+	free(relay);
 }
 
 static void
@@ -199,11 +236,13 @@ relay_set_cord_name(int fd)
 void
 relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 {
-	struct relay relay;
-	relay_create(&relay, fd, sync, relay_send_initial_join_row);
-	assert(relay.stream.write != NULL);
-	engine_join_xc(vclock, &relay.stream);
-	relay_destroy(&relay);
+	struct relay *relay = relay_new(NULL);
+	if (relay == NULL)
+		diag_raise();
+	relay_start(relay, fd, sync, relay_send_initial_join_row);
+	engine_join_xc(vclock, &relay->stream);
+	relay_stop(relay);
+	relay_delete(relay);
 }
 
 int
@@ -222,22 +261,22 @@ relay_final_join_f(va_list ap)
 }
 
 void
-relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
-	         struct vclock *stop_vclock)
+relay_final_join(struct replica *replica, int fd, uint64_t sync,
+		 struct vclock *start_vclock, struct vclock *stop_vclock)
 {
-	struct relay relay;
-	relay_create(&relay, fd, sync, relay_send_row);
-	relay.r = recovery_new(cfg_gets("wal_dir"),
+	struct relay *relay = replica->relay;
+	relay_start(relay, fd, sync, relay_send_row);
+	relay->r = recovery_new(cfg_gets("wal_dir"),
 			       cfg_geti("force_recovery"),
 			       start_vclock);
-	vclock_copy(&relay.stop_vclock, stop_vclock);
+	vclock_copy(&relay->stop_vclock, stop_vclock);
 
-	int rc = cord_costart(&relay.cord, "final_join",
-			      relay_final_join_f, &relay);
+	int rc = cord_costart(&relay->cord, "final_join",
+			      relay_final_join_f, relay);
 	if (rc == 0)
-		rc = cord_cojoin(&relay.cord);
+		rc = cord_cojoin(&relay->cord);
 
-	relay_destroy(&relay);
+	relay_stop(relay);
 
 	if (rc != 0)
 		diag_raise();
@@ -334,7 +373,7 @@ static void
 relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 {
 	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
-	if (relay->exiting) {
+	if (relay->state != RELAY_FOLLOW) {
 		/*
 		 * Do not try to send anything to the replica
 		 * if it already closed its socket.
@@ -495,14 +534,13 @@ relay_subscribe_f(va_list ap)
 	if (!fiber_is_dead(reader))
 		fiber_cancel(reader);
 	fiber_join(reader);
-	relay->exiting = true;
 	trigger_clear(&on_close_log);
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
 	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
 		    NULL, NULL, cbus_process);
 	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
 	if (!diag_is_empty(&relay->diag)) {
-		/* An error has occured while ACKs of xlog reading */
+		/* An error has occurred while reading ACKs of xlog. */
 		diag_move(&relay->diag, diag_get());
 	}
 	struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE);
@@ -514,16 +552,16 @@ relay_subscribe_f(va_list ap)
 
 /** Replication acceptor fiber handler. */
 void
-relay_subscribe(int fd, uint64_t sync, struct replica *replica,
+relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_clock, uint32_t replica_version_id)
 {
 	assert(replica->id != REPLICA_ID_NIL);
+	struct relay *relay = replica->relay;
 	/* Don't allow multiple relays for the same replica */
-	if (replica->relay != NULL) {
+	if (relay->state == RELAY_FOLLOW) {
 		tnt_raise(ClientError, ER_CFG, "replication",
 			  "duplicate connection with the same replica UUID");
 	}
-
 	/*
 	 * Register the replica with the garbage collector
 	 * unless it has already been registered by initial
@@ -537,24 +575,21 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica,
 			diag_raise();
 	}
 
-	struct relay relay;
-	relay_create(&relay, fd, sync, relay_send_row);
-	relay.r = recovery_new(cfg_gets("wal_dir"),
-			       cfg_geti("force_recovery"),
-			       replica_clock);
-	vclock_copy(&relay.tx.vclock, replica_clock);
-	relay.version_id = replica_version_id;
-	relay.replica = replica;
-	replica_set_relay(replica, &relay);
-	vclock_copy(&relay.local_vclock_at_subscribe, &replicaset.vclock);
-
-	int rc = cord_costart(&relay.cord, tt_sprintf("relay_%p", &relay),
-			      relay_subscribe_f, &relay);
+	relay_start(relay, fd, sync, relay_send_row);
+	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
+	relay->r = recovery_new(cfg_gets("wal_dir"),
+			        cfg_geti("force_recovery"),
+			        replica_clock);
+	vclock_copy(&relay->tx.vclock, replica_clock);
+	relay->version_id = replica_version_id;
+
+	int rc = cord_costart(&relay->cord, tt_sprintf("relay_%p", relay),
+			      relay_subscribe_f, relay);
 	if (rc == 0)
-		rc = cord_cojoin(&relay.cord);
+		rc = cord_cojoin(&relay->cord);
 
-	replica_clear_relay(replica);
-	relay_destroy(&relay);
+	relay_stop(relay);
+	replica_on_relay_stop(replica);
 
 	if (rc != 0)
 		diag_raise();
@@ -595,8 +630,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 	 * it). In the latter case packet's LSN is less than or equal to
 	 * local master's LSN at the moment it received 'SUBSCRIBE' request.
 	 */
-	if (relay->replica == NULL ||
-	    packet->replica_id != relay->replica->id ||
+	if (packet->replica_id != relay->replica->id ||
 	    packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
 				      packet->replica_id)) {
 		relay_send(relay, packet);
diff --git a/src/box/relay.h b/src/box/relay.h
index c8a2c2872..f039cbef8 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -42,6 +42,33 @@ struct replica;
 struct tt_uuid;
 struct vclock;
 
+enum relay_state {
+	/**
+	 * Applier has not connected to the master or not expected.
+	 */
+	RELAY_OFF,
+	/**
+	 * Applier has connected to the master.
+	 */
+	RELAY_FOLLOW,
+	/**
+	 * Applier disconnected from the master.
+	 */
+	RELAY_STOPPED,
+};
+
+/** Create a relay which is not running. object. */
+struct relay *
+relay_new(struct replica *replica);
+
+/** Destroy and delete the relay */
+void
+relay_delete(struct relay *relay);
+
+/** Return the current state of relay. */
+enum relay_state
+relay_get_state(const struct relay *relay);
+
 /**
  * Returns relay's vclock
  * @param relay relay
@@ -71,8 +98,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock);
  * @param sync      sync from incoming JOIN request
  */
 void
-relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
-	         struct vclock *stop_vclock);
+relay_final_join(struct replica *replica, int fd, uint64_t sync,
+		 struct vclock *start_vclock, struct vclock *stop_vclock);
 
 /**
  * Subscribe a replica to updates.
@@ -80,7 +107,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
  * @return none.
  */
 void
-relay_subscribe(int fd, uint64_t sync, struct replica *replica,
+relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_vclock, uint32_t replica_version_id);
 
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 9aac1f077..2c5b356d9 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -39,6 +39,7 @@
 #include "box.h"
 #include "gc.h"
 #include "error.h"
+#include "relay.h"
 #include "vclock.h" /* VCLOCK_MAX */
 
 uint32_t instance_id = REPLICA_ID_NIL;
@@ -81,8 +82,6 @@ void
 replication_init(void)
 {
 	memset(&replicaset, 0, sizeof(replicaset));
-	mempool_create(&replicaset.pool, &cord()->slabc,
-		       sizeof(struct replica));
 	replica_hash_new(&replicaset.hash);
 	rlist_create(&replicaset.anon);
 	vclock_create(&replicaset.vclock);
@@ -92,7 +91,6 @@ replication_init(void)
 void
 replication_free(void)
 {
-	mempool_destroy(&replicaset.pool);
 	fiber_cond_destroy(&replicaset.applier.cond);
 }
 
@@ -114,8 +112,9 @@ replica_check_id(uint32_t replica_id)
 static bool
 replica_is_orphan(struct replica *replica)
 {
+	assert(replica->relay != NULL);
 	return replica->id == REPLICA_ID_NIL && replica->applier == NULL &&
-	       replica->relay == NULL;
+	       relay_get_state(replica->relay) != RELAY_FOLLOW;
 }
 
 static void
@@ -125,14 +124,19 @@ static struct replica *
 replica_new(void)
 {
 	struct replica *replica = (struct replica *)
-			mempool_alloc(&replicaset.pool);
-	if (replica == NULL)
+			malloc(sizeof(struct replica));
+	if (replica == NULL) {
 		tnt_raise(OutOfMemory, sizeof(*replica), "malloc",
 			  "struct replica");
+	}
+	replica->relay = relay_new(replica);
+	if (replica->relay == NULL) {
+		free(replica);
+		diag_raise();
+	}
 	replica->id = 0;
 	replica->uuid = uuid_nil;
 	replica->applier = NULL;
-	replica->relay = NULL;
 	replica->gc = NULL;
 	rlist_create(&replica->in_anon);
 	trigger_create(&replica->on_applier_state,
@@ -145,9 +149,12 @@ static void
 replica_delete(struct replica *replica)
 {
 	assert(replica_is_orphan(replica));
+	if (replica->relay != NULL)
+		relay_delete(replica->relay);
 	if (replica->gc != NULL)
 		gc_consumer_unregister(replica->gc);
-	mempool_free(&replicaset.pool, replica);
+	TRASH(replica);
+	free(replica);
 }
 
 struct replica *
@@ -657,18 +664,8 @@ replicaset_check_quorum(void)
 }
 
 void
-replica_set_relay(struct replica *replica, struct relay *relay)
-{
-	assert(replica->id != REPLICA_ID_NIL);
-	assert(replica->relay == NULL);
-	replica->relay = relay;
-}
-
-void
-replica_clear_relay(struct replica *replica)
+replica_on_relay_stop(struct replica *replica)
 {
-	assert(replica->relay != NULL);
-	replica->relay = NULL;
 	if (replica_is_orphan(replica)) {
 		replica_hash_remove(&replicaset.hash, replica);
 		replica_delete(replica);
diff --git a/src/box/replication.h b/src/box/replication.h
index e2bdd814f..c6e5158d4 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -37,7 +37,6 @@
 #include <small/rb.h> /* replicaset_t */
 #include <small/rlist.h>
 #include "applier.h"
-#include <small/mempool.h>
 #include "fiber_cond.h"
 #include "vclock.h"
 
@@ -173,8 +172,6 @@ typedef rb_tree(struct replica) replica_hash_t;
  * relays, usually connected in full mesh.
  */
 struct replicaset {
-	/** Memory pool for struct replica allocations. */
-	struct mempool pool;
 	/** Hash of replicas indexed by UUID. */
 	replica_hash_t hash;
 	/**
@@ -301,19 +298,11 @@ replica_set_id(struct replica *replica, uint32_t id);
 void
 replica_clear_id(struct replica *replica);
 
-/**
- * Register \a relay of a \a replica.
- * \pre a replica can have only one relay
- * \pre replica->id != REPLICA_ID_NIL
- */
-void
-replica_set_relay(struct replica *replica, struct relay *relay);
-
 /**
  * Unregister \a relay from the \a replica.
  */
 void
-replica_clear_relay(struct replica *replica);
+replica_on_relay_stop(struct replica *replica);
 
 #if defined(__cplusplus)
 } /* extern "C" */
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list