[PATCH v3/3] replication: handle replication shutdown correctly.

Vladimir Davydov vdavydov.dev at gmail.com
Wed Aug 15 16:49:27 MSK 2018


On Wed, Aug 15, 2018 at 02:03:03PM +0300, Serge Petrenko wrote:
> diff --git a/src/box/box.cc b/src/box/box.cc
> index ae4959d6f..c5e05bc15 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1597,9 +1597,9 @@ box_free(void)
>  	 * initialized
>  	 */
>  	if (is_box_configured) {
> +	        replication_free();
>  #if 0
>  		session_free();
> -		replication_free();

Nit: better move replication_free() after #endif, where all other
destructor are called.

>  		user_cache_free();
>  		schema_free();
>  		module_free();
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 4cacbc840..81dc88a06 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
>  	relay->state = RELAY_FOLLOW;
>  }
>  
> +void
> +relay_cancel(struct relay *relay)
> +{
> +	assert(relay->state == RELAY_FOLLOW);

IMO it would be better to make this function work both if the relay
thread is running and if it is not (do nothing in the latter case).

Also relay->state is set to RELAY_FOLLOW on initial join, when
relay thread isn't running. In this case, you must not call pthread
cancel/join.

> +	tt_pthread_cancel(relay->cord.id);
> +	tt_pthread_join(relay->cord.id, NULL);
> +}
> +
>  static void
>  relay_stop(struct relay *relay)
>  {
> @@ -460,6 +468,9 @@ relay_send_heartbeat(struct relay *relay)
>  static int
>  relay_subscribe_f(va_list ap)
>  {
> +	/* Allow to be cancelled. */
> +	tt_pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
> +	tt_pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

These cancelstate/type are set by default for all new threads and so we
don't need to set them explicitly, no?

>  	struct relay *relay = va_arg(ap, struct relay *);
>  	struct recovery *r = relay->r;
>  
> diff --git a/src/box/relay.h b/src/box/relay.h
> index 2988e6b0d..53bf68eb8 100644
> --- a/src/box/relay.h
> +++ b/src/box/relay.h
> @@ -61,6 +61,10 @@ enum relay_state {
>  struct relay *
>  relay_new(struct replica *replica);
>  
> +/** Cancel a running relay. Called on shutdown. */
> +void
> +relay_cancel(struct relay *relay);
> +
>  /** Destroy and delete the relay */
>  void
>  relay_delete(struct relay *relay);
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 48956d2ed..acf20fc2c 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -91,13 +91,6 @@ replication_init(void)
>  	latch_create(&replicaset.applier.order_latch);
>  }
>  
> -void
> -replication_free(void)
> -{
> -	free(replicaset.replica_by_id);
> -	fiber_cond_destroy(&replicaset.applier.cond);
> -}
> -
>  void
>  replica_check_id(uint32_t replica_id)
>  {
> @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica)
>  	trigger_clear(&replica->on_applier_state);
>  }
>  
> +void
> +replication_free(void)
> +{

After some consideration, I still don't think it's a good idea to add
the code that frees replicas in this patch, because it's a completely
separate change, which has nothing to do with the issue you're fixing.
The more code you add the more difficult it will be to cherry-pick this
patch, and it also increases the likelihood of introducing a bug.
Please remove this code so that replication_free only cancels relay
threads.

We will probably need to free memory on shutdown (to silence valgrind
and asan), but let's do it separately and when we get to it.

> +	struct replica *replica, *next;
> +
> +	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
> +		if (replica->id == instance_id) {
> +			replica_hash_remove(&replicaset.hash, replica);
> +			/*
> +			 * Local replica doesn't have neither applier
> +			 * nor relay, so free it right away.
> +			 */
> +			TRASH(replica);
> +			free(replica);
> +			continue;
> +		}
> +		if (replica->applier != NULL) {
> +			replica_clear_applier(replica);
> +			/*
> +			 * We're exiting, so control won't be passed
> +			 * to appliers and we don't need to stop them.
> +			 */
> +		}
> +		if (replica->id != REPLICA_ID_NIL) {
> +			if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
> +				replica->id = REPLICA_ID_NIL;
> +				relay_cancel(replica->relay);

Please add a comment here why we need to stop relay threads on shutdown.

> +			}
> +		} else {
> +			replica_hash_remove(&replicaset.hash, replica);
> +			replica_delete(replica);
> +		}
> +	}
> +	rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
> +		replica_clear_applier(replica);
> +		replica_delete(replica);
> +	}
> +	free(replicaset.replica_by_id);
> +	fiber_cond_destroy(&replicaset.applier.cond);
> +}
> +
>  static void
>  replica_on_applier_sync(struct replica *replica)
>  {
> diff --git a/src/box/replication.h b/src/box/replication.h
> index e8b391af2..a8baafd12 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -154,6 +154,9 @@ replication_disconnect_timeout(void)
>  void
>  replication_init(void);
>  
> +/**
> + * Stop replication and delete all replicas and replicaset.
> + */

This comment is not really needed - it's clear from the name of this
function that it is a destructor. Besides, replication_init() doesn't
have a comment so let's leave this declaration as is.

>  void
>  replication_free(void);
>  
> diff --git a/src/tt_pthread.h b/src/tt_pthread.h
> index 60ade50ae..144425ccb 100644
> --- a/src/tt_pthread.h
> +++ b/src/tt_pthread.h
> @@ -277,6 +277,21 @@
>  	tt_pthread_error(e__);				\
>  })
>  
> +#define tt_pthread_cancel(thread)			\
> +({	int e__ = pthread_cancel(thread);		\
> +	tt_pthread_error(e__);				\
> + })

Extra space.

> +
 +#define tt_pthread_setcancelstate(state, ret)		\
> +({	int e__ = pthread_setcancelstate(state, ret);	\
> +	tt_pthread_error(e__);				\
> +})
> +
> +#define tt_pthread_setcanceltype(type, ret)		\
> +({	int e__ = pthread_setcanceltype(type, ret);	\
> +	tt_pthread_error(e__);				\
> +})
> +
>  #define tt_pthread_key_create(key, dtor)		\
>  ({	int e__ = pthread_key_create(key, dtor);	\
>  	tt_pthread_error(e__);				\
> diff --git a/test/replication/gc.result b/test/replication/gc.result
> index 3f9db26ce..aa59747db 100644
> --- a/test/replication/gc.result
> +++ b/test/replication/gc.result
> @@ -152,6 +152,9 @@ box.snapshot()
>  ---
>  - ok
>  ...
> +wait_gc(1)
> +---
> +...

I don't understand how this is relevant to the issue you're fixing.
If the test is buggy, you should fix it in a separate patch.

>  #box.info.gc().checkpoints == 1 or box.info.gc()
>  ---
>  - true
> @@ -311,6 +314,9 @@ box.snapshot()
>  ---
>  - ok
>  ...
> +wait_gc(1)
> +---
> +...
>  #box.info.gc().checkpoints == 1 or box.info.gc()
>  ---
>  - true
> @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
>  test_run:cleanup_cluster()
>  ---
>  ...
> +fiber.sleep(0.1)
> +---
> +...

Please don't use sleeps. Use wait_gc() instead.



More information about the Tarantool-patches mailing list