[PATCH v3/3] replication: handle replication shutdown correctly.
Serge Petrenko
sergepetrenko at tarantool.org
Wed Aug 15 19:13:28 MSK 2018
Hi! Fixed your comments. The new diff is below.
The branch remains:
> 15 авг. 2018 г., в 16:49, Vladimir Davydov <vdavydov.dev at gmail.com> написал(а):
>
> On Wed, Aug 15, 2018 at 02:03:03PM +0300, Serge Petrenko wrote:
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index ae4959d6f..c5e05bc15 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -1597,9 +1597,9 @@ box_free(void)
>> * initialized
>> */
>> if (is_box_configured) {
>> + replication_free();
>> #if 0
>> session_free();
>> - replication_free();
>
> Nit: better move replication_free() after #endif, where all other
> destructor are called.
Fixed.
>
>> user_cache_free();
>> schema_free();
>> module_free();
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 4cacbc840..81dc88a06 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
>> relay->state = RELAY_FOLLOW;
>> }
>>
>> +void
>> +relay_cancel(struct relay *relay)
>> +{
>> + assert(relay->state == RELAY_FOLLOW);
>
> IMO it would be better to make this function work both if the relay
> thread is running and if it is not (do nothing in the latter case).
>
> Also relay->state is set to RELAY_FOLLOW on initial join, when
> relay thread isn't running. In this case, you must not call pthread
> cancel/join.
Fixed both.
>
>> + tt_pthread_cancel(relay->cord.id);
>> + tt_pthread_join(relay->cord.id, NULL);
>> +}
>> +
>> static void
>> relay_stop(struct relay *relay)
>> {
>> @@ -460,6 +468,9 @@ relay_send_heartbeat(struct relay *relay)
>> static int
>> relay_subscribe_f(va_list ap)
>> {
>> + /* Allow to be cancelled. */
>> + tt_pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
>> + tt_pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
>
> These cancelstate/type are set by default for all new threads and so we
> don't need to set them explicitly, no?
Yes, you’re right. Removed.
>
>> struct relay *relay = va_arg(ap, struct relay *);
>> struct recovery *r = relay->r;
>>
>> diff --git a/src/box/relay.h b/src/box/relay.h
>> index 2988e6b0d..53bf68eb8 100644
>> --- a/src/box/relay.h
>> +++ b/src/box/relay.h
>> @@ -61,6 +61,10 @@ enum relay_state {
>> struct relay *
>> relay_new(struct replica *replica);
>>
>> +/** Cancel a running relay. Called on shutdown. */
>> +void
>> +relay_cancel(struct relay *relay);
>> +
>> /** Destroy and delete the relay */
>> void
>> relay_delete(struct relay *relay);
>> diff --git a/src/box/replication.cc b/src/box/replication.cc
>> index 48956d2ed..acf20fc2c 100644
>> --- a/src/box/replication.cc
>> +++ b/src/box/replication.cc
>> @@ -91,13 +91,6 @@ replication_init(void)
>> latch_create(&replicaset.applier.order_latch);
>> }
>>
>> -void
>> -replication_free(void)
>> -{
>> - free(replicaset.replica_by_id);
>> - fiber_cond_destroy(&replicaset.applier.cond);
>> -}
>> -
>> void
>> replica_check_id(uint32_t replica_id)
>> {
>> @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica)
>> trigger_clear(&replica->on_applier_state);
>> }
>>
>> +void
>> +replication_free(void)
>> +{
>
> After some consideration, I still don't think it's a good idea to add
> the code that frees replicas in this patch, because it's a completely
> separate change, which has nothing to do with the issue you're fixing.
> The more code you add the more difficult it will be to cherry-pick this
> patch, and it also increases the likelihood of introducing a bug.
> Please remove this code so that replication_free only cancels relay
> threads.
>
> We will probably need to free memory on shutdown (to silence valgrind
> and asan), but let's do it separately and when we get to it.
Ok. Let’s do it in a separate patch.
>
>> + struct replica *replica, *next;
>> +
>> + replica_hash_foreach_safe(&replicaset.hash, replica, next) {
>> + if (replica->id == instance_id) {
>> + replica_hash_remove(&replicaset.hash, replica);
>> + /*
>> + * Local replica doesn't have neither applier
>> + * nor relay, so free it right away.
>> + */
>> + TRASH(replica);
>> + free(replica);
>> + continue;
>> + }
>> + if (replica->applier != NULL) {
>> + replica_clear_applier(replica);
>> + /*
>> + * We're exiting, so control won't be passed
>> + * to appliers and we don't need to stop them.
>> + */
>> + }
>> + if (replica->id != REPLICA_ID_NIL) {
>> + if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
>> + replica->id = REPLICA_ID_NIL;
>> + relay_cancel(replica->relay);
>
> Please add a comment here why we need to stop relay threads on shutdown.
Done.
>
>> + }
>> + } else {
>> + replica_hash_remove(&replicaset.hash, replica);
>> + replica_delete(replica);
>> + }
>> + }
>> + rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
>> + replica_clear_applier(replica);
>> + replica_delete(replica);
>> + }
>> + free(replicaset.replica_by_id);
>> + fiber_cond_destroy(&replicaset.applier.cond);
>> +}
>> +
>> static void
>> replica_on_applier_sync(struct replica *replica)
>> {
>> diff --git a/src/box/replication.h b/src/box/replication.h
>> index e8b391af2..a8baafd12 100644
>> --- a/src/box/replication.h
>> +++ b/src/box/replication.h
>> @@ -154,6 +154,9 @@ replication_disconnect_timeout(void)
>> void
>> replication_init(void);
>>
>> +/**
>> + * Stop replication and delete all replicas and replicaset.
>> + */
>
> This comment is not really needed - it's clear from the name of this
> function that it is a destructor. Besides, replication_init() doesn't
> have a comment so let's leave this declaration as is.
Ok, removed.
>
>> void
>> replication_free(void);
>>
>> diff --git a/src/tt_pthread.h b/src/tt_pthread.h
>> index 60ade50ae..144425ccb 100644
>> --- a/src/tt_pthread.h
>> +++ b/src/tt_pthread.h
>> @@ -277,6 +277,21 @@
>> tt_pthread_error(e__); \
>> })
>>
>> +#define tt_pthread_cancel(thread) \
>> +({ int e__ = pthread_cancel(thread); \
>> + tt_pthread_error(e__); \
>> + })
>
> Extra space.
Fixed.
>
>> +
> +#define tt_pthread_setcancelstate(state, ret) \
>> +({ int e__ = pthread_setcancelstate(state, ret); \
>> + tt_pthread_error(e__); \
>> +})
>> +
>> +#define tt_pthread_setcanceltype(type, ret) \
>> +({ int e__ = pthread_setcanceltype(type, ret); \
>> + tt_pthread_error(e__); \
>> +})
>> +
>> #define tt_pthread_key_create(key, dtor) \
>> ({ int e__ = pthread_key_create(key, dtor); \
>> tt_pthread_error(e__); \
>> diff --git a/test/replication/gc.result b/test/replication/gc.result
>> index 3f9db26ce..aa59747db 100644
>> --- a/test/replication/gc.result
>> +++ b/test/replication/gc.result
>> @@ -152,6 +152,9 @@ box.snapshot()
>> ---
>> - ok
>> ...
>> +wait_gc(1)
>> +---
>> +...
>
> I don't understand how this is relevant to the issue you're fixing.
> If the test is buggy, you should fix it in a separate patch.
Removed, this is not relevant to the issue.
>
>> #box.info.gc().checkpoints == 1 or box.info.gc()
>> ---
>> - true
>> @@ -311,6 +314,9 @@ box.snapshot()
>> ---
>> - ok
>> ...
>> +wait_gc(1)
>> +---
>> +...
>> #box.info.gc().checkpoints == 1 or box.info.gc()
>> ---
>> - true
>> @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
>> test_run:cleanup_cluster()
>> ---
>> ...
>> +fiber.sleep(0.1)
>> +---
>> +...
>
> Please don't use sleeps. Use wait_gc() instead.
Here’s the new diff:
---
src/box/box.cc | 2 +-
src/box/relay.cc | 15 +++++++++++++++
src/box/relay.h | 4 ++++
src/box/replication.cc | 43 ++++++++++++++++++++++++++++++++++++-------
src/tt_pthread.h | 5 +++++
5 files changed, 61 insertions(+), 8 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..28bfdd5fb 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1599,13 +1599,13 @@ box_free(void)
if (is_box_configured) {
#if 0
session_free();
- replication_free();
user_cache_free();
schema_free();
module_free();
tuple_free();
port_free();
#endif
+ replication_free();
sequence_free();
gc_free();
engine_shutdown();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..a89a7732d 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -198,6 +198,16 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
relay->state = RELAY_FOLLOW;
}
+void
+relay_cancel(struct relay *relay)
+{
+ /* Check that the thread is running first. */
+ if (relay->cord.id != 0) {
+ tt_pthread_cancel(relay->cord.id);
+ tt_pthread_join(relay->cord.id, NULL);
+ }
+}
+
static void
relay_stop(struct relay *relay)
{
@@ -211,6 +221,11 @@ relay_stop(struct relay *relay)
recovery_delete(relay->r);
relay->r = NULL;
relay->state = RELAY_STOPPED;
+ /*
+ * Needed to track whether relay thread is running or not
+ * for relay_cancel(). Id is reset upon cord_create().
+ */
+ relay->cord.id = 0;
}
void
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..53bf68eb8 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
struct relay *
relay_new(struct replica *replica);
+/** Cancel a running relay. Called on shutdown. */
+void
+relay_cancel(struct relay *relay);
+
/** Destroy and delete the relay */
void
relay_delete(struct relay *relay);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..083ae6407 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -91,13 +91,6 @@ replication_init(void)
latch_create(&replicaset.applier.order_latch);
}
-void
-replication_free(void)
-{
- free(replicaset.replica_by_id);
- fiber_cond_destroy(&replicaset.applier.cond);
-}
-
void
replica_check_id(uint32_t replica_id)
{
@@ -242,6 +235,42 @@ replica_clear_applier(struct replica *replica)
trigger_clear(&replica->on_applier_state);
}
+void
+replication_free(void)
+{
+ struct replica *replica, *next;
+
+ replica_hash_foreach_safe(&replicaset.hash, replica, next) {
+ if (replica->id == instance_id) {
+ replica_hash_remove(&replicaset.hash, replica);
+ /*
+ * Local replica doesn't have neither applier
+ * nor relay, so ignore it.
+ */
+ continue;
+ }
+ if (replica->applier != NULL) {
+ replica_clear_applier(replica);
+ /*
+ * We're exiting, so control won't be passed
+ * to appliers and we don't need to stop them.
+ */
+ }
+ if (replica->id != REPLICA_ID_NIL) {
+ /*
+ * Relay threads keep sending messages
+ * to tx via cbus upon shutdown, which
+ * could lead to segfaults. So cancel
+ * them.
+ */
+ relay_cancel(replica->relay);
+ }
+ }
+
+ free(replicaset.replica_by_id);
+ fiber_cond_destroy(&replicaset.applier.cond);
+}
+
static void
replica_on_applier_sync(struct replica *replica)
{
diff --git a/src/tt_pthread.h b/src/tt_pthread.h
index 60ade50ae..d83694460 100644
--- a/src/tt_pthread.h
+++ b/src/tt_pthread.h
@@ -277,6 +277,11 @@
tt_pthread_error(e__); \
})
+#define tt_pthread_cancel(thread) \
+({ int e__ = pthread_cancel(thread); \
+ tt_pthread_error(e__); \
+})
+
#define tt_pthread_key_create(key, dtor) \
({ int e__ = pthread_key_create(key, dtor); \
tt_pthread_error(e__); \
--
2.15.2 (Apple Git-101.1)
More information about the Tarantool-patches
mailing list