* [PATCH v3/2] replication: implement replication_shutdown().
@ 2018-08-14 8:18 Serge Petrenko
0 siblings, 0 replies; only message in thread
From: Serge Petrenko @ 2018-08-14 8:18 UTC (permalink / raw)
To: tarantool-patches; +Cc: georgy, vdavydov.dev, Serge Petrenko
Relay threads keep using tx upon shutdown, which leads to occasional
segmentation faults and assertion fails (e.g. in replication test
suite).
Fix this by finishing implementation of replication_free() and introducing
relay_cancel().
replication_free calls relay_cancel to stop every relay thread that is
using tx.
Also add delays to replication/gc.test to fix tests on travis.
Closes #3485
---
https://github.com/tarantool/tarantool/issues/3485
https://github.com/tarantool/tarantool/tree/sp/alt-gh-3485-replication-shutdown
This is the second variant of the patch, as discussed yesterday with
Vladimir and Georgy. Instead of sending a message via cpipe, we just
send a signal to relay. Relay does pthread_exit in signal handler.
Changes in v3:
- eliminate tx_in_use flag.
- do not send a message to relay, just
send a signal with pthread_kill() and
upon recieving the signal exit from
relay.
- wait for the relay thread with
tt_pthread_join() before proceeding with
tx destruction.
- make sure all the replicas are deleted,
including the ones in replicaset.anon
list.
Changes in v2:
- instead of setting tx_in_use flag
in relay and checking it in tx, send a
message from relay to tx to set the flag.
src/box/box.cc | 2 +-
src/box/relay.cc | 28 ++++++++++++++++++++++++++
src/box/relay.h | 4 ++++
src/box/replication.cc | 48 +++++++++++++++++++++++++++++++++++++-------
src/box/replication.h | 3 +++
test/replication/gc.result | 15 ++++++++++++++
test/replication/gc.test.lua | 6 +++++-
7 files changed, 97 insertions(+), 9 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..c5e05bc15 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1597,9 +1597,9 @@ box_free(void)
* initialized
*/
if (is_box_configured) {
+ replication_free();
#if 0
session_free();
- replication_free();
user_cache_free();
schema_free();
module_free();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..6a9267200 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
relay->state = RELAY_FOLLOW;
}
+void
+relay_cancel(struct relay *relay)
+{
+ assert(relay->state == RELAY_FOLLOW);
+ pthread_kill(relay->cord.id, SIGUSR1);
+ tt_pthread_join(relay->cord.id, NULL);
+}
+
static void
relay_stop(struct relay *relay)
{
@@ -452,6 +460,12 @@ relay_send_heartbeat(struct relay *relay)
}
}
+void
+relay_on_signal(int)
+{
+ pthread_exit(NULL);
+}
+
/**
* A libev callback invoked when a relay client socket is ready
* for read. This currently only happens when the client closes
@@ -460,6 +474,20 @@ relay_send_heartbeat(struct relay *relay)
static int
relay_subscribe_f(va_list ap)
{
+ /*
+ * Set a signal handler to terminate the relay upon
+ * tarantool exit.
+ */
+ sigset_t set;
+ sigemptyset(&set);
+ sigaddset(&set, SIGUSR1);
+ pthread_sigmask(SIG_UNBLOCK, &set, NULL);
+ struct sigaction act;
+ act.sa_handler = &relay_on_signal;
+ sigfillset(&act.sa_mask);
+ act.sa_flags = SA_RESTART;
+ sigaction(SIGUSR1, &act, NULL);
+
struct relay *relay = va_arg(ap, struct relay *);
struct recovery *r = relay->r;
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..53bf68eb8 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
struct relay *
relay_new(struct replica *replica);
+/** Cancel a running relay. Called on shutdown. */
+void
+relay_cancel(struct relay *relay);
+
/** Destroy and delete the relay */
void
relay_delete(struct relay *relay);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..acf20fc2c 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -91,13 +91,6 @@ replication_init(void)
latch_create(&replicaset.applier.order_latch);
}
-void
-replication_free(void)
-{
- free(replicaset.replica_by_id);
- fiber_cond_destroy(&replicaset.applier.cond);
-}
-
void
replica_check_id(uint32_t replica_id)
{
@@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica)
trigger_clear(&replica->on_applier_state);
}
+void
+replication_free(void)
+{
+ struct replica *replica, *next;
+
+ replica_hash_foreach_safe(&replicaset.hash, replica, next) {
+ if (replica->id == instance_id) {
+ replica_hash_remove(&replicaset.hash, replica);
+ /*
+ * Local replica doesn't have neither applier
+ * nor relay, so free it right away.
+ */
+ TRASH(replica);
+ free(replica);
+ continue;
+ }
+ if (replica->applier != NULL) {
+ replica_clear_applier(replica);
+ /*
+ * We're exiting, so control won't be passed
+ * to appliers and we don't need to stop them.
+ */
+ }
+ if (replica->id != REPLICA_ID_NIL) {
+ if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
+ replica->id = REPLICA_ID_NIL;
+ relay_cancel(replica->relay);
+ }
+ } else {
+ replica_hash_remove(&replicaset.hash, replica);
+ replica_delete(replica);
+ }
+ }
+ rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
+ replica_clear_applier(replica);
+ replica_delete(replica);
+ }
+ free(replicaset.replica_by_id);
+ fiber_cond_destroy(&replicaset.applier.cond);
+}
+
static void
replica_on_applier_sync(struct replica *replica)
{
diff --git a/src/box/replication.h b/src/box/replication.h
index e8b391af2..a8baafd12 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -154,6 +154,9 @@ replication_disconnect_timeout(void)
void
replication_init(void);
+/**
+ * Stop replication and delete all replicas and replicaset.
+ */
void
replication_free(void);
diff --git a/test/replication/gc.result b/test/replication/gc.result
index 3f9db26ce..aa59747db 100644
--- a/test/replication/gc.result
+++ b/test/replication/gc.result
@@ -152,6 +152,9 @@ box.snapshot()
---
- ok
...
+wait_gc(1)
+---
+...
#box.info.gc().checkpoints == 1 or box.info.gc()
---
- true
@@ -311,6 +314,9 @@ box.snapshot()
---
- ok
...
+wait_gc(1)
+---
+...
#box.info.gc().checkpoints == 1 or box.info.gc()
---
- true
@@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
test_run:cleanup_cluster()
---
...
+fiber.sleep(0.1)
+---
+...
#box.info.gc().checkpoints == 1 or box.info.gc()
---
- true
@@ -395,6 +404,9 @@ replica_port ~= nil
box.cfg{replication = replica_port}
---
...
+fiber.sleep(0.1)
+---
+...
-- Stop the replica and write a few WALs.
test_run:cmd("stop server replica")
---
@@ -425,6 +437,9 @@ box.snapshot()
---
- ok
...
+fiber.sleep(0.1)
+---
+...
#fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
---
- true
diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
index 96f11f8d4..28e0996f5 100644
--- a/test/replication/gc.test.lua
+++ b/test/replication/gc.test.lua
@@ -78,6 +78,7 @@ box.snapshot()
-- Invoke garbage collection. Check that it doesn't remove
-- xlogs needed by the replica.
box.snapshot()
+wait_gc(1)
#box.info.gc().checkpoints == 1 or box.info.gc()
#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
@@ -144,6 +145,7 @@ _ = s:auto_increment{}
box.snapshot()
_ = s:auto_increment{}
box.snapshot()
+wait_gc(1)
#box.info.gc().checkpoints == 1 or box.info.gc()
xlog_count = #fio.glob('./master/*.xlog')
-- the replica may have managed to download all data
@@ -154,6 +156,7 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
-- The xlog should only be deleted after the replica
-- is unregistered.
test_run:cleanup_cluster()
+fiber.sleep(0.1)
#box.info.gc().checkpoints == 1 or box.info.gc()
#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
--
@@ -185,7 +188,7 @@ test_run:cmd("start server replica")
replica_port = test_run:eval('replica', 'return box.cfg.listen')[1]
replica_port ~= nil
box.cfg{replication = replica_port}
-
+fiber.sleep(0.1)
-- Stop the replica and write a few WALs.
test_run:cmd("stop server replica")
test_run:cmd("cleanup server replica")
@@ -195,6 +198,7 @@ _ = s:auto_increment{}
box.snapshot()
_ = s:auto_increment{}
box.snapshot()
+fiber.sleep(0.1)
#fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
-- Delete the replica from the cluster table and check that
--
2.15.2 (Apple Git-101.1)
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2018-08-14 8:18 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-08-14 8:18 [PATCH v3/2] replication: implement replication_shutdown() Serge Petrenko
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox