Tarantool development patches archive
 help / color / mirror / Atom feed
* [PATCH v3/3] replication: handle replication shutdown correctly.
@ 2018-08-15 11:03 Serge Petrenko
  2018-08-15 13:49 ` Vladimir Davydov
  0 siblings, 1 reply; 6+ messages in thread
From: Serge Petrenko @ 2018-08-15 11:03 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, vdavydov.dev, Serge Petrenko

Relay threads keep using tx upon shutdown, which leads to occasional
segmentation faults and assertion fails (e.g. in replication test
suite).

Fix this by finishing implementation of replication_free() and introducing
relay_cancel().
replication_free calls relay_cancel to stop every relay thread that is
using tx.
Also add delays to replication/gc.test to fix tests on travis.

Closes #3485
---

https://github.com/tarantool/tarantool/issues/3485
https://github.com/tarantool/tarantool/tree/sp/alt-gh-3485-replication-shutdown

This is the third variant of the patch, as discussed yesterday with
Vladimir and Georgy. Instead of sending a message via cpipe or sending
a signal with pthread_kill, we cancel a thread with pthread_cancel.

Changes in v3: 
 - eliminate tx_in_use flag.
 - do not send a message to relay, just
   cancel a thread with pthread_cancel() and
   wait for it to exit with pthread_join().
 - wait for the relay thread with
   tt_pthread_join() before proceeding with
   tx destruction.
 - add macros tt_pthread_setcanceltype(),
   tt_pthread_setcancelstate(),
   tt_pthread_cancel() to tt_pthread.h
 - make sure all the replicas are deleted,
   including the ones in replicaset.anon
   list.

Changes in v2: 
 - instead of setting tx_in_use flag
   in relay and checking it in tx, send a
   message from relay to tx to set the flag.

 src/box/box.cc               |  2 +-
 src/box/relay.cc             | 11 ++++++++++
 src/box/relay.h              |  4 ++++
 src/box/replication.cc       | 48 +++++++++++++++++++++++++++++++++++++-------
 src/box/replication.h        |  3 +++
 src/tt_pthread.h             | 15 ++++++++++++++
 test/replication/gc.result   | 15 ++++++++++++++
 test/replication/gc.test.lua |  6 +++++-
 8 files changed, 95 insertions(+), 9 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..c5e05bc15 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1597,9 +1597,9 @@ box_free(void)
 	 * initialized
 	 */
 	if (is_box_configured) {
+	        replication_free();
 #if 0
 		session_free();
-		replication_free();
 		user_cache_free();
 		schema_free();
 		module_free();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..81dc88a06 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 }
 
+void
+relay_cancel(struct relay *relay)
+{
+	assert(relay->state == RELAY_FOLLOW);
+	tt_pthread_cancel(relay->cord.id);
+	tt_pthread_join(relay->cord.id, NULL);
+}
+
 static void
 relay_stop(struct relay *relay)
 {
@@ -460,6 +468,9 @@ relay_send_heartbeat(struct relay *relay)
 static int
 relay_subscribe_f(va_list ap)
 {
+	/* Allow to be cancelled. */
+	tt_pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+	tt_pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
 	struct relay *relay = va_arg(ap, struct relay *);
 	struct recovery *r = relay->r;
 
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..53bf68eb8 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
 struct relay *
 relay_new(struct replica *replica);
 
+/** Cancel a running relay. Called on shutdown. */
+void
+relay_cancel(struct relay *relay);
+
 /** Destroy and delete the relay */
 void
 relay_delete(struct relay *relay);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..acf20fc2c 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -91,13 +91,6 @@ replication_init(void)
 	latch_create(&replicaset.applier.order_latch);
 }
 
-void
-replication_free(void)
-{
-	free(replicaset.replica_by_id);
-	fiber_cond_destroy(&replicaset.applier.cond);
-}
-
 void
 replica_check_id(uint32_t replica_id)
 {
@@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica)
 	trigger_clear(&replica->on_applier_state);
 }
 
+void
+replication_free(void)
+{
+	struct replica *replica, *next;
+
+	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
+		if (replica->id == instance_id) {
+			replica_hash_remove(&replicaset.hash, replica);
+			/*
+			 * Local replica doesn't have neither applier
+			 * nor relay, so free it right away.
+			 */
+			TRASH(replica);
+			free(replica);
+			continue;
+		}
+		if (replica->applier != NULL) {
+			replica_clear_applier(replica);
+			/*
+			 * We're exiting, so control won't be passed
+			 * to appliers and we don't need to stop them.
+			 */
+		}
+		if (replica->id != REPLICA_ID_NIL) {
+			if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
+				replica->id = REPLICA_ID_NIL;
+				relay_cancel(replica->relay);
+			}
+		} else {
+			replica_hash_remove(&replicaset.hash, replica);
+			replica_delete(replica);
+		}
+	}
+	rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
+		replica_clear_applier(replica);
+		replica_delete(replica);
+	}
+	free(replicaset.replica_by_id);
+	fiber_cond_destroy(&replicaset.applier.cond);
+}
+
 static void
 replica_on_applier_sync(struct replica *replica)
 {
diff --git a/src/box/replication.h b/src/box/replication.h
index e8b391af2..a8baafd12 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -154,6 +154,9 @@ replication_disconnect_timeout(void)
 void
 replication_init(void);
 
+/**
+ * Stop replication and delete all replicas and replicaset.
+ */
 void
 replication_free(void);
 
diff --git a/src/tt_pthread.h b/src/tt_pthread.h
index 60ade50ae..144425ccb 100644
--- a/src/tt_pthread.h
+++ b/src/tt_pthread.h
@@ -277,6 +277,21 @@
 	tt_pthread_error(e__);				\
 })
 
+#define tt_pthread_cancel(thread)			\
+({	int e__ = pthread_cancel(thread);		\
+	tt_pthread_error(e__);				\
+ })
+
+#define tt_pthread_setcancelstate(state, ret)		\
+({	int e__ = pthread_setcancelstate(state, ret);	\
+	tt_pthread_error(e__);				\
+})
+
+#define tt_pthread_setcanceltype(type, ret)		\
+({	int e__ = pthread_setcanceltype(type, ret);	\
+	tt_pthread_error(e__);				\
+})
+
 #define tt_pthread_key_create(key, dtor)		\
 ({	int e__ = pthread_key_create(key, dtor);	\
 	tt_pthread_error(e__);				\
diff --git a/test/replication/gc.result b/test/replication/gc.result
index 3f9db26ce..aa59747db 100644
--- a/test/replication/gc.result
+++ b/test/replication/gc.result
@@ -152,6 +152,9 @@ box.snapshot()
 ---
 - ok
 ...
+wait_gc(1)
+---
+...
 #box.info.gc().checkpoints == 1 or box.info.gc()
 ---
 - true
@@ -311,6 +314,9 @@ box.snapshot()
 ---
 - ok
 ...
+wait_gc(1)
+---
+...
 #box.info.gc().checkpoints == 1 or box.info.gc()
 ---
 - true
@@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
 test_run:cleanup_cluster()
 ---
 ...
+fiber.sleep(0.1)
+---
+...
 #box.info.gc().checkpoints == 1 or box.info.gc()
 ---
 - true
@@ -395,6 +404,9 @@ replica_port ~= nil
 box.cfg{replication = replica_port}
 ---
 ...
+fiber.sleep(0.1)
+---
+...
 -- Stop the replica and write a few WALs.
 test_run:cmd("stop server replica")
 ---
@@ -425,6 +437,9 @@ box.snapshot()
 ---
 - ok
 ...
+fiber.sleep(0.1)
+---
+...
 #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
 ---
 - true
diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
index 96f11f8d4..28e0996f5 100644
--- a/test/replication/gc.test.lua
+++ b/test/replication/gc.test.lua
@@ -78,6 +78,7 @@ box.snapshot()
 -- Invoke garbage collection. Check that it doesn't remove
 -- xlogs needed by the replica.
 box.snapshot()
+wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
 #fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 
@@ -144,6 +145,7 @@ _ = s:auto_increment{}
 box.snapshot()
 _ = s:auto_increment{}
 box.snapshot()
+wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
 xlog_count = #fio.glob('./master/*.xlog')
 -- the replica may have managed to download all data
@@ -154,6 +156,7 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
 -- The xlog should only be deleted after the replica
 -- is unregistered.
 test_run:cleanup_cluster()
+fiber.sleep(0.1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
 #fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
 --
@@ -185,7 +188,7 @@ test_run:cmd("start server replica")
 replica_port = test_run:eval('replica', 'return box.cfg.listen')[1]
 replica_port ~= nil
 box.cfg{replication = replica_port}
-
+fiber.sleep(0.1)
 -- Stop the replica and write a few WALs.
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
@@ -195,6 +198,7 @@ _ = s:auto_increment{}
 box.snapshot()
 _ = s:auto_increment{}
 box.snapshot()
+fiber.sleep(0.1)
 #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
 
 -- Delete the replica from the cluster table and check that
-- 
2.15.2 (Apple Git-101.1)

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH v3/3] replication: handle replication shutdown correctly.
  2018-08-15 11:03 [PATCH v3/3] replication: handle replication shutdown correctly Serge Petrenko
@ 2018-08-15 13:49 ` Vladimir Davydov
  2018-08-15 16:13   ` Serge Petrenko
  0 siblings, 1 reply; 6+ messages in thread
From: Vladimir Davydov @ 2018-08-15 13:49 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, georgy

On Wed, Aug 15, 2018 at 02:03:03PM +0300, Serge Petrenko wrote:
> diff --git a/src/box/box.cc b/src/box/box.cc
> index ae4959d6f..c5e05bc15 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1597,9 +1597,9 @@ box_free(void)
>  	 * initialized
>  	 */
>  	if (is_box_configured) {
> +	        replication_free();
>  #if 0
>  		session_free();
> -		replication_free();

Nit: better move replication_free() after #endif, where all other
destructor are called.

>  		user_cache_free();
>  		schema_free();
>  		module_free();
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 4cacbc840..81dc88a06 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
>  	relay->state = RELAY_FOLLOW;
>  }
>  
> +void
> +relay_cancel(struct relay *relay)
> +{
> +	assert(relay->state == RELAY_FOLLOW);

IMO it would be better to make this function work both if the relay
thread is running and if it is not (do nothing in the latter case).

Also relay->state is set to RELAY_FOLLOW on initial join, when
relay thread isn't running. In this case, you must not call pthread
cancel/join.

> +	tt_pthread_cancel(relay->cord.id);
> +	tt_pthread_join(relay->cord.id, NULL);
> +}
> +
>  static void
>  relay_stop(struct relay *relay)
>  {
> @@ -460,6 +468,9 @@ relay_send_heartbeat(struct relay *relay)
>  static int
>  relay_subscribe_f(va_list ap)
>  {
> +	/* Allow to be cancelled. */
> +	tt_pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
> +	tt_pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

These cancelstate/type are set by default for all new threads and so we
don't need to set them explicitly, no?

>  	struct relay *relay = va_arg(ap, struct relay *);
>  	struct recovery *r = relay->r;
>  
> diff --git a/src/box/relay.h b/src/box/relay.h
> index 2988e6b0d..53bf68eb8 100644
> --- a/src/box/relay.h
> +++ b/src/box/relay.h
> @@ -61,6 +61,10 @@ enum relay_state {
>  struct relay *
>  relay_new(struct replica *replica);
>  
> +/** Cancel a running relay. Called on shutdown. */
> +void
> +relay_cancel(struct relay *relay);
> +
>  /** Destroy and delete the relay */
>  void
>  relay_delete(struct relay *relay);
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 48956d2ed..acf20fc2c 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -91,13 +91,6 @@ replication_init(void)
>  	latch_create(&replicaset.applier.order_latch);
>  }
>  
> -void
> -replication_free(void)
> -{
> -	free(replicaset.replica_by_id);
> -	fiber_cond_destroy(&replicaset.applier.cond);
> -}
> -
>  void
>  replica_check_id(uint32_t replica_id)
>  {
> @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica)
>  	trigger_clear(&replica->on_applier_state);
>  }
>  
> +void
> +replication_free(void)
> +{

After some consideration, I still don't think it's a good idea to add
the code that frees replicas in this patch, because it's a completely
separate change, which has nothing to do with the issue you're fixing.
The more code you add the more difficult it will be to cherry-pick this
patch, and it also increases the likelihood of introducing a bug.
Please remove this code so that replication_free only cancels relay
threads.

We will probably need to free memory on shutdown (to silence valgrind
and asan), but let's do it separately and when we get to it.

> +	struct replica *replica, *next;
> +
> +	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
> +		if (replica->id == instance_id) {
> +			replica_hash_remove(&replicaset.hash, replica);
> +			/*
> +			 * Local replica doesn't have neither applier
> +			 * nor relay, so free it right away.
> +			 */
> +			TRASH(replica);
> +			free(replica);
> +			continue;
> +		}
> +		if (replica->applier != NULL) {
> +			replica_clear_applier(replica);
> +			/*
> +			 * We're exiting, so control won't be passed
> +			 * to appliers and we don't need to stop them.
> +			 */
> +		}
> +		if (replica->id != REPLICA_ID_NIL) {
> +			if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
> +				replica->id = REPLICA_ID_NIL;
> +				relay_cancel(replica->relay);

Please add a comment here why we need to stop relay threads on shutdown.

> +			}
> +		} else {
> +			replica_hash_remove(&replicaset.hash, replica);
> +			replica_delete(replica);
> +		}
> +	}
> +	rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
> +		replica_clear_applier(replica);
> +		replica_delete(replica);
> +	}
> +	free(replicaset.replica_by_id);
> +	fiber_cond_destroy(&replicaset.applier.cond);
> +}
> +
>  static void
>  replica_on_applier_sync(struct replica *replica)
>  {
> diff --git a/src/box/replication.h b/src/box/replication.h
> index e8b391af2..a8baafd12 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -154,6 +154,9 @@ replication_disconnect_timeout(void)
>  void
>  replication_init(void);
>  
> +/**
> + * Stop replication and delete all replicas and replicaset.
> + */

This comment is not really needed - it's clear from the name of this
function that it is a destructor. Besides, replication_init() doesn't
have a comment so let's leave this declaration as is.

>  void
>  replication_free(void);
>  
> diff --git a/src/tt_pthread.h b/src/tt_pthread.h
> index 60ade50ae..144425ccb 100644
> --- a/src/tt_pthread.h
> +++ b/src/tt_pthread.h
> @@ -277,6 +277,21 @@
>  	tt_pthread_error(e__);				\
>  })
>  
> +#define tt_pthread_cancel(thread)			\
> +({	int e__ = pthread_cancel(thread);		\
> +	tt_pthread_error(e__);				\
> + })

Extra space.

> +
 +#define tt_pthread_setcancelstate(state, ret)		\
> +({	int e__ = pthread_setcancelstate(state, ret);	\
> +	tt_pthread_error(e__);				\
> +})
> +
> +#define tt_pthread_setcanceltype(type, ret)		\
> +({	int e__ = pthread_setcanceltype(type, ret);	\
> +	tt_pthread_error(e__);				\
> +})
> +
>  #define tt_pthread_key_create(key, dtor)		\
>  ({	int e__ = pthread_key_create(key, dtor);	\
>  	tt_pthread_error(e__);				\
> diff --git a/test/replication/gc.result b/test/replication/gc.result
> index 3f9db26ce..aa59747db 100644
> --- a/test/replication/gc.result
> +++ b/test/replication/gc.result
> @@ -152,6 +152,9 @@ box.snapshot()
>  ---
>  - ok
>  ...
> +wait_gc(1)
> +---
> +...

I don't understand how this is relevant to the issue you're fixing.
If the test is buggy, you should fix it in a separate patch.

>  #box.info.gc().checkpoints == 1 or box.info.gc()
>  ---
>  - true
> @@ -311,6 +314,9 @@ box.snapshot()
>  ---
>  - ok
>  ...
> +wait_gc(1)
> +---
> +...
>  #box.info.gc().checkpoints == 1 or box.info.gc()
>  ---
>  - true
> @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
>  test_run:cleanup_cluster()
>  ---
>  ...
> +fiber.sleep(0.1)
> +---
> +...

Please don't use sleeps. Use wait_gc() instead.

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH v3/3] replication: handle replication shutdown correctly.
  2018-08-15 13:49 ` Vladimir Davydov
@ 2018-08-15 16:13   ` Serge Petrenko
  2018-08-15 18:47     ` Vladimir Davydov
  0 siblings, 1 reply; 6+ messages in thread
From: Serge Petrenko @ 2018-08-15 16:13 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches, Georgy Kirichenko

Hi! Fixed your comments. The new diff is below.
The branch remains: 

> 15 авг. 2018 г., в 16:49, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):
> 
> On Wed, Aug 15, 2018 at 02:03:03PM +0300, Serge Petrenko wrote:
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index ae4959d6f..c5e05bc15 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -1597,9 +1597,9 @@ box_free(void)
>> 	 * initialized
>> 	 */
>> 	if (is_box_configured) {
>> +	        replication_free();
>> #if 0
>> 		session_free();
>> -		replication_free();
> 
> Nit: better move replication_free() after #endif, where all other
> destructor are called.

Fixed.

> 
>> 		user_cache_free();
>> 		schema_free();
>> 		module_free();
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 4cacbc840..81dc88a06 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
>> 	relay->state = RELAY_FOLLOW;
>> }
>> 
>> +void
>> +relay_cancel(struct relay *relay)
>> +{
>> +	assert(relay->state == RELAY_FOLLOW);
> 
> IMO it would be better to make this function work both if the relay
> thread is running and if it is not (do nothing in the latter case).
> 
> Also relay->state is set to RELAY_FOLLOW on initial join, when
> relay thread isn't running. In this case, you must not call pthread
> cancel/join.

Fixed both.

> 
>> +	tt_pthread_cancel(relay->cord.id);
>> +	tt_pthread_join(relay->cord.id, NULL);
>> +}
>> +
>> static void
>> relay_stop(struct relay *relay)
>> {
>> @@ -460,6 +468,9 @@ relay_send_heartbeat(struct relay *relay)
>> static int
>> relay_subscribe_f(va_list ap)
>> {
>> +	/* Allow to be cancelled. */
>> +	tt_pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
>> +	tt_pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
> 
> These cancelstate/type are set by default for all new threads and so we
> don't need to set them explicitly, no?

Yes, you’re right. Removed.

> 
>> 	struct relay *relay = va_arg(ap, struct relay *);
>> 	struct recovery *r = relay->r;
>> 
>> diff --git a/src/box/relay.h b/src/box/relay.h
>> index 2988e6b0d..53bf68eb8 100644
>> --- a/src/box/relay.h
>> +++ b/src/box/relay.h
>> @@ -61,6 +61,10 @@ enum relay_state {
>> struct relay *
>> relay_new(struct replica *replica);
>> 
>> +/** Cancel a running relay. Called on shutdown. */
>> +void
>> +relay_cancel(struct relay *relay);
>> +
>> /** Destroy and delete the relay */
>> void
>> relay_delete(struct relay *relay);
>> diff --git a/src/box/replication.cc b/src/box/replication.cc
>> index 48956d2ed..acf20fc2c 100644
>> --- a/src/box/replication.cc
>> +++ b/src/box/replication.cc
>> @@ -91,13 +91,6 @@ replication_init(void)
>> 	latch_create(&replicaset.applier.order_latch);
>> }
>> 
>> -void
>> -replication_free(void)
>> -{
>> -	free(replicaset.replica_by_id);
>> -	fiber_cond_destroy(&replicaset.applier.cond);
>> -}
>> -
>> void
>> replica_check_id(uint32_t replica_id)
>> {
>> @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica)
>> 	trigger_clear(&replica->on_applier_state);
>> }
>> 
>> +void
>> +replication_free(void)
>> +{
> 
> After some consideration, I still don't think it's a good idea to add
> the code that frees replicas in this patch, because it's a completely
> separate change, which has nothing to do with the issue you're fixing.
> The more code you add the more difficult it will be to cherry-pick this
> patch, and it also increases the likelihood of introducing a bug.
> Please remove this code so that replication_free only cancels relay
> threads.
> 
> We will probably need to free memory on shutdown (to silence valgrind
> and asan), but let's do it separately and when we get to it.

Ok. Let’s do it in a separate patch.

> 
>> +	struct replica *replica, *next;
>> +
>> +	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
>> +		if (replica->id == instance_id) {
>> +			replica_hash_remove(&replicaset.hash, replica);
>> +			/*
>> +			 * Local replica doesn't have neither applier
>> +			 * nor relay, so free it right away.
>> +			 */
>> +			TRASH(replica);
>> +			free(replica);
>> +			continue;
>> +		}
>> +		if (replica->applier != NULL) {
>> +			replica_clear_applier(replica);
>> +			/*
>> +			 * We're exiting, so control won't be passed
>> +			 * to appliers and we don't need to stop them.
>> +			 */
>> +		}
>> +		if (replica->id != REPLICA_ID_NIL) {
>> +			if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
>> +				replica->id = REPLICA_ID_NIL;
>> +				relay_cancel(replica->relay);
> 
> Please add a comment here why we need to stop relay threads on shutdown.

Done.

> 
>> +			}
>> +		} else {
>> +			replica_hash_remove(&replicaset.hash, replica);
>> +			replica_delete(replica);
>> +		}
>> +	}
>> +	rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
>> +		replica_clear_applier(replica);
>> +		replica_delete(replica);
>> +	}
>> +	free(replicaset.replica_by_id);
>> +	fiber_cond_destroy(&replicaset.applier.cond);
>> +}
>> +
>> static void
>> replica_on_applier_sync(struct replica *replica)
>> {
>> diff --git a/src/box/replication.h b/src/box/replication.h
>> index e8b391af2..a8baafd12 100644
>> --- a/src/box/replication.h
>> +++ b/src/box/replication.h
>> @@ -154,6 +154,9 @@ replication_disconnect_timeout(void)
>> void
>> replication_init(void);
>> 
>> +/**
>> + * Stop replication and delete all replicas and replicaset.
>> + */
> 
> This comment is not really needed - it's clear from the name of this
> function that it is a destructor. Besides, replication_init() doesn't
> have a comment so let's leave this declaration as is.

Ok, removed.
> 
>> void
>> replication_free(void);
>> 
>> diff --git a/src/tt_pthread.h b/src/tt_pthread.h
>> index 60ade50ae..144425ccb 100644
>> --- a/src/tt_pthread.h
>> +++ b/src/tt_pthread.h
>> @@ -277,6 +277,21 @@
>> 	tt_pthread_error(e__);				\
>> })
>> 
>> +#define tt_pthread_cancel(thread)			\
>> +({	int e__ = pthread_cancel(thread);		\
>> +	tt_pthread_error(e__);				\
>> + })
> 
> Extra space.

Fixed.

> 
>> +
> +#define tt_pthread_setcancelstate(state, ret)		\
>> +({	int e__ = pthread_setcancelstate(state, ret);	\
>> +	tt_pthread_error(e__);				\
>> +})
>> +
>> +#define tt_pthread_setcanceltype(type, ret)		\
>> +({	int e__ = pthread_setcanceltype(type, ret);	\
>> +	tt_pthread_error(e__);				\
>> +})
>> +
>> #define tt_pthread_key_create(key, dtor)		\
>> ({	int e__ = pthread_key_create(key, dtor);	\
>> 	tt_pthread_error(e__);				\
>> diff --git a/test/replication/gc.result b/test/replication/gc.result
>> index 3f9db26ce..aa59747db 100644
>> --- a/test/replication/gc.result
>> +++ b/test/replication/gc.result
>> @@ -152,6 +152,9 @@ box.snapshot()
>> ---
>> - ok
>> ...
>> +wait_gc(1)
>> +---
>> +...
> 
> I don't understand how this is relevant to the issue you're fixing.
> If the test is buggy, you should fix it in a separate patch.

Removed, this is not relevant to the issue.
> 
>> #box.info.gc().checkpoints == 1 or box.info.gc()
>> ---
>> - true
>> @@ -311,6 +314,9 @@ box.snapshot()
>> ---
>> - ok
>> ...
>> +wait_gc(1)
>> +---
>> +...
>> #box.info.gc().checkpoints == 1 or box.info.gc()
>> ---
>> - true
>> @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
>> test_run:cleanup_cluster()
>> ---
>> ...
>> +fiber.sleep(0.1)
>> +---
>> +...
> 
> Please don't use sleeps. Use wait_gc() instead.

Here’s the new diff:

---
 src/box/box.cc         |  2 +-
 src/box/relay.cc       | 15 +++++++++++++++
 src/box/relay.h        |  4 ++++
 src/box/replication.cc | 43 ++++++++++++++++++++++++++++++++++++-------
 src/tt_pthread.h       |  5 +++++
 5 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..28bfdd5fb 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1599,13 +1599,13 @@ box_free(void)
 	if (is_box_configured) {
 #if 0
 		session_free();
-		replication_free();
 		user_cache_free();
 		schema_free();
 		module_free();
 		tuple_free();
 		port_free();
 #endif
+		replication_free();
 		sequence_free();
 		gc_free();
 		engine_shutdown();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..a89a7732d 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -198,6 +198,16 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 }
 
+void
+relay_cancel(struct relay *relay)
+{
+	/* Check that the thread is running first. */
+	if (relay->cord.id != 0) {
+		tt_pthread_cancel(relay->cord.id);
+		tt_pthread_join(relay->cord.id, NULL);
+	}
+}
+
 static void
 relay_stop(struct relay *relay)
 {
@@ -211,6 +221,11 @@ relay_stop(struct relay *relay)
 		recovery_delete(relay->r);
 	relay->r = NULL;
 	relay->state = RELAY_STOPPED;
+	/*
+	 * Needed to track whether relay thread is running or not
+	 * for relay_cancel(). Id is reset upon cord_create().
+	 */
+	relay->cord.id = 0;
 }
 
 void
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..53bf68eb8 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
 struct relay *
 relay_new(struct replica *replica);
 
+/** Cancel a running relay. Called on shutdown. */
+void
+relay_cancel(struct relay *relay);
+
 /** Destroy and delete the relay */
 void
 relay_delete(struct relay *relay);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..083ae6407 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -91,13 +91,6 @@ replication_init(void)
 	latch_create(&replicaset.applier.order_latch);
 }
 
-void
-replication_free(void)
-{
-	free(replicaset.replica_by_id);
-	fiber_cond_destroy(&replicaset.applier.cond);
-}
-
 void
 replica_check_id(uint32_t replica_id)
 {
@@ -242,6 +235,42 @@ replica_clear_applier(struct replica *replica)
 	trigger_clear(&replica->on_applier_state);
 }
 
+void
+replication_free(void)
+{
+	struct replica *replica, *next;
+
+	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
+		if (replica->id == instance_id) {
+			replica_hash_remove(&replicaset.hash, replica);
+			/*
+			 * Local replica doesn't have neither applier
+			 * nor relay, so ignore it.
+			 */
+			continue;
+		}
+		if (replica->applier != NULL) {
+			replica_clear_applier(replica);
+			/*
+			 * We're exiting, so control won't be passed
+			 * to appliers and we don't need to stop them.
+			 */
+		}
+		if (replica->id != REPLICA_ID_NIL) {
+			/*
+			 * Relay threads keep sending messages
+			 * to tx via cbus upon shutdown, which
+			 * could lead to segfaults. So cancel
+			 * them.
+			 */
+			relay_cancel(replica->relay);
+		}
+	}
+
+	free(replicaset.replica_by_id);
+	fiber_cond_destroy(&replicaset.applier.cond);
+}
+
 static void
 replica_on_applier_sync(struct replica *replica)
 {
diff --git a/src/tt_pthread.h b/src/tt_pthread.h
index 60ade50ae..d83694460 100644
--- a/src/tt_pthread.h
+++ b/src/tt_pthread.h
@@ -277,6 +277,11 @@
 	tt_pthread_error(e__);				\
 })
 
+#define tt_pthread_cancel(thread)			\
+({	int e__ = pthread_cancel(thread);		\
+	tt_pthread_error(e__);				\
+})
+
 #define tt_pthread_key_create(key, dtor)		\
 ({	int e__ = pthread_key_create(key, dtor);	\
 	tt_pthread_error(e__);				\
-- 
2.15.2 (Apple Git-101.1)

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH v3/3] replication: handle replication shutdown correctly.
  2018-08-15 16:13   ` Serge Petrenko
@ 2018-08-15 18:47     ` Vladimir Davydov
  2018-08-16  6:05       ` [tarantool-patches] " Serge Petrenko
  0 siblings, 1 reply; 6+ messages in thread
From: Vladimir Davydov @ 2018-08-15 18:47 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, Georgy Kirichenko

On Wed, Aug 15, 2018 at 07:13:28PM +0300, Serge Petrenko wrote:
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 48956d2ed..083ae6407 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -91,13 +91,6 @@ replication_init(void)
>  	latch_create(&replicaset.applier.order_latch);
>  }
>  
> -void
> -replication_free(void)
> -{
> -	free(replicaset.replica_by_id);
> -	fiber_cond_destroy(&replicaset.applier.cond);
> -}
> -
>  void
>  replica_check_id(uint32_t replica_id)
>  {
> @@ -242,6 +235,42 @@ replica_clear_applier(struct replica *replica)
>  	trigger_clear(&replica->on_applier_state);
>  }
>  
> +void
> +replication_free(void)
> +{
> +	struct replica *replica, *next;
> +
> +	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
> +		if (replica->id == instance_id) {
> +			replica_hash_remove(&replicaset.hash, replica);
> +			/*
> +			 * Local replica doesn't have neither applier
> +			 * nor relay, so ignore it.
> +			 */
> +			continue;
> +		}
> +		if (replica->applier != NULL) {
> +			replica_clear_applier(replica);
> +			/*
> +			 * We're exiting, so control won't be passed
> +			 * to appliers and we don't need to stop them.
> +			 */
> +		}

You don't need this code either. I want this loop to be as simple as

	/*
	 * <explain why>
	 */
	replicaset_foreach(replica)
		relay_cancel(replica->relay);

Then you wouldn't even need to move the definition of replication_free.

> +		if (replica->id != REPLICA_ID_NIL) {
> +			/*
> +			 * Relay threads keep sending messages
> +			 * to tx via cbus upon shutdown, which
> +			 * could lead to segfaults. So cancel
> +			 * them.
> +			 */
> +			relay_cancel(replica->relay);
> +		}
> +	}
> +
> +	free(replicaset.replica_by_id);
> +	fiber_cond_destroy(&replicaset.applier.cond);
> +}

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v3/3] replication: handle replication shutdown correctly.
  2018-08-15 18:47     ` Vladimir Davydov
@ 2018-08-16  6:05       ` Serge Petrenko
  2018-08-16  8:55         ` Vladimir Davydov
  0 siblings, 1 reply; 6+ messages in thread
From: Serge Petrenko @ 2018-08-16  6:05 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Georgy Kirichenko, tarantool-patches



> 15 авг. 2018 г., в 21:47, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):
> 
> On Wed, Aug 15, 2018 at 07:13:28PM +0300, Serge Petrenko wrote:
>> diff --git a/src/box/replication.cc b/src/box/replication.cc
>> index 48956d2ed..083ae6407 100644
>> --- a/src/box/replication.cc
>> +++ b/src/box/replication.cc
>> @@ -91,13 +91,6 @@ replication_init(void)
>> 	latch_create(&replicaset.applier.order_latch);
>> }
>> 
>> -void
>> -replication_free(void)
>> -{
>> -	free(replicaset.replica_by_id);
>> -	fiber_cond_destroy(&replicaset.applier.cond);
>> -}
>> -
>> void
>> replica_check_id(uint32_t replica_id)
>> {
>> @@ -242,6 +235,42 @@ replica_clear_applier(struct replica *replica)
>> 	trigger_clear(&replica->on_applier_state);
>> }
>> 
>> +void
>> +replication_free(void)
>> +{
>> +	struct replica *replica, *next;
>> +
>> +	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
>> +		if (replica->id == instance_id) {
>> +			replica_hash_remove(&replicaset.hash, replica);
>> +			/*
>> +			 * Local replica doesn't have neither applier
>> +			 * nor relay, so ignore it.
>> +			 */
>> +			continue;
>> +		}
>> +		if (replica->applier != NULL) {
>> +			replica_clear_applier(replica);
>> +			/*
>> +			 * We're exiting, so control won't be passed
>> +			 * to appliers and we don't need to stop them.
>> +			 */
>> +		}
> 
> You don't need this code either. I want this loop to be as simple as
> 
> 	/*
> 	 * <explain why>
> 	 */
> 	replicaset_foreach(replica)
> 		relay_cancel(replica->relay);
> 
> Then you wouldn't even need to move the definition of replication_free.

Done as requested. The new diff is below. The branch remains
https://github.com/tarantool/tarantool/tree/sp/alt2-gh-3485-replication-shutdown

> 
>> +		if (replica->id != REPLICA_ID_NIL) {
>> +			/*
>> +			 * Relay threads keep sending messages
>> +			 * to tx via cbus upon shutdown, which
>> +			 * could lead to segfaults. So cancel
>> +			 * them.
>> +			 */
>> +			relay_cancel(replica->relay);
>> +		}
>> +	}
>> +
>> +	free(replicaset.replica_by_id);
>> +	fiber_cond_destroy(&replicaset.applier.cond);
>> +}
> 

---
 src/box/box.cc         |  2 +-
 src/box/relay.cc       | 16 ++++++++++++++++
 src/box/relay.h        |  4 ++++
 src/box/replication.cc | 10 ++++++++++
 src/tt_pthread.h       |  5 +++++
 5 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..28bfdd5fb 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1599,13 +1599,13 @@ box_free(void)
 	if (is_box_configured) {
 #if 0
 		session_free();
-		replication_free();
 		user_cache_free();
 		schema_free();
 		module_free();
 		tuple_free();
 		port_free();
 #endif
+		replication_free();
 		sequence_free();
 		gc_free();
 		engine_shutdown();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..edd1c80b0 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -198,6 +198,16 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 }
 
+void
+relay_cancel(struct relay *relay)
+{
+	/* Check that the thread is running first. */
+	if (relay->cord.id != 0) {
+		tt_pthread_cancel(relay->cord.id);
+		tt_pthread_join(relay->cord.id, NULL);
+	}
+}
+
 static void
 relay_stop(struct relay *relay)
 {
@@ -211,6 +221,12 @@ relay_stop(struct relay *relay)
 		recovery_delete(relay->r);
 	relay->r = NULL;
 	relay->state = RELAY_STOPPED;
+	/*
+	 * Needed to track whether relay thread is running or not
+	 * for relay_cancel(). Id is reset to a positive value
+	 * upon cord_create().
+	 */
+	relay->cord.id = 0;
 }
 
 void
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..53bf68eb8 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
 struct relay *
 relay_new(struct replica *replica);
 
+/** Cancel a running relay. Called on shutdown. */
+void
+relay_cancel(struct relay *relay);
+
 /** Destroy and delete the relay */
 void
 relay_delete(struct relay *relay);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..4b0700f90 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -94,6 +94,16 @@ replication_init(void)
 void
 replication_free(void)
 {
+	/*
+	 * Relay threads keep sending messages
+	 * to tx via cbus upon shutdown, which
+	 * could lead to segfaults. So cancel
+	 * them.
+	 */
+	replicaset_foreach(replica) {
+		relay_cancel(replica->relay);
+	}
+
 	free(replicaset.replica_by_id);
 	fiber_cond_destroy(&replicaset.applier.cond);
 }
diff --git a/src/tt_pthread.h b/src/tt_pthread.h
index 60ade50ae..d83694460 100644
--- a/src/tt_pthread.h
+++ b/src/tt_pthread.h
@@ -277,6 +277,11 @@
 	tt_pthread_error(e__);				\
 })
 
+#define tt_pthread_cancel(thread)			\
+({	int e__ = pthread_cancel(thread);		\
+	tt_pthread_error(e__);				\
+})
+
 #define tt_pthread_key_create(key, dtor)		\
 ({	int e__ = pthread_key_create(key, dtor);	\
 	tt_pthread_error(e__);				\
-- 
2.15.2 (Apple Git-101.1)

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v3/3] replication: handle replication shutdown correctly.
  2018-08-16  6:05       ` [tarantool-patches] " Serge Petrenko
@ 2018-08-16  8:55         ` Vladimir Davydov
  0 siblings, 0 replies; 6+ messages in thread
From: Vladimir Davydov @ 2018-08-16  8:55 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: Georgy Kirichenko, tarantool-patches

Pushed to 1.10

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2018-08-16  8:55 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-08-15 11:03 [PATCH v3/3] replication: handle replication shutdown correctly Serge Petrenko
2018-08-15 13:49 ` Vladimir Davydov
2018-08-15 16:13   ` Serge Petrenko
2018-08-15 18:47     ` Vladimir Davydov
2018-08-16  6:05       ` [tarantool-patches] " Serge Petrenko
2018-08-16  8:55         ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox