From: Konstantin Belyavskiy <k.belyavskiy@tarantool.org> To: georgy@tarantool.org, kostja@tarantool.org Cc: tarantool-patches@freelists.org Subject: [tarantool-patches] [PATCH 2/3] replication: do not delete relay on applier disconnect Date: Wed, 16 May 2018 14:32:26 +0300 [thread overview] Message-ID: <130891313d476e3d5450be9014a905afa72ef08b.1526469555.git.k.belyavskiy@tarantool.org> (raw) In-Reply-To: <cover.1526469555.git.k.belyavskiy@tarantool.org> In-Reply-To: <cover.1526469555.git.k.belyavskiy@tarantool.org> This is a part of more complex task aiming to improve logging. Do not destroy relay since it stores last error and it can be useful for diagnostic reason. Now relay is created with replica and always exists. So also remove several NULL checks. Add relay_state { OFF, FOLLOW and STOPPED } to track replica presence, once connected it either FOLLOW or STOPPED until master is reset. Used for #3365. --- src/box/lua/info.c | 2 +- src/box/relay.cc | 70 +++++++++++++++++++++++++++++++++++++------------- src/box/relay.h | 27 +++++++++++++++++++ src/box/replication.cc | 29 +++++++++------------ src/box/replication.h | 11 -------- 5 files changed, 92 insertions(+), 47 deletions(-) diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 8e8fd9d97..9dbc3f92c 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -145,7 +145,7 @@ lbox_pushreplica(lua_State *L, struct replica *replica) lua_settable(L, -3); } - if (relay != NULL) { + if (relay_get_state(replica->relay) == RELAY_FOLLOW) { lua_pushstring(L, "downstream"); lbox_pushrelay(L, relay); lua_settable(L, -3); diff --git a/src/box/relay.cc b/src/box/relay.cc index d2ceaf110..49835bcb2 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -131,6 +131,8 @@ struct relay { struct stailq pending_gc; /** Time when last row was sent to peer. */ double last_row_tm; + /** Relay sync state. */ + enum relay_state state; struct { /* Align to prevent false-sharing with tx thread */ @@ -140,6 +142,17 @@ struct relay { } tx; }; +enum relay_state +relay_get_state(const struct relay *relay) +{ + return relay->state; +} +void +relay_set_state(struct relay *relay, enum relay_state state) +{ + relay->state = state; +} + const struct vclock * relay_vclock(const struct relay *relay) { @@ -153,6 +166,17 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row); static void relay_send_row(struct xstream *stream, struct xrow_header *row); +struct relay* +relay_new(void) +{ + size_t size = sizeof(struct relay); + struct relay *relay = (struct relay *)malloc(size); + if (relay == NULL) + tnt_raise(OutOfMemory, size, "relay_new", "malloc"); + relay->state = RELAY_OFF; + return relay; +} + static void relay_create(struct relay *relay, int fd, uint64_t sync, void (*stream_write)(struct xstream *, struct xrow_header *)) @@ -167,15 +191,23 @@ relay_create(struct relay *relay, int fd, uint64_t sync, } static void -relay_destroy(struct relay *relay) +relay_reset(struct relay *relay) { struct relay_gc_msg *gc_msg, *next_gc_msg; stailq_foreach_entry_safe(gc_msg, next_gc_msg, &relay->pending_gc, in_pending) { free(gc_msg); } + //stailq_create(&relay->pending_gc); if (relay->r != NULL) recovery_delete(relay->r); + relay->r = NULL; +} + +void +relay_destroy(struct relay *relay) +{ + relay_reset(relay); fiber_cond_destroy(&relay->reader_cond); diag_destroy(&relay->diag); TRASH(relay); @@ -502,7 +534,7 @@ relay_subscribe_f(va_list ap) NULL, NULL, cbus_process); cbus_endpoint_destroy(&relay->endpoint, cbus_process); if (!diag_is_empty(&relay->diag)) { - /* An error has occured while ACKs of xlog reading */ + /* An error has occurred while reading ACKs of xlog. */ diag_move(&relay->diag, diag_get()); } struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE); @@ -518,8 +550,9 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica, struct vclock *replica_clock, uint32_t replica_version_id) { assert(replica->id != REPLICA_ID_NIL); + struct relay *relay = replica->relay; /* Don't allow multiple relays for the same replica */ - if (replica->relay != NULL) { + if (relay->state == RELAY_FOLLOW) { tnt_raise(ClientError, ER_CFG, "replication", "duplicate connection with the same replica UUID"); } @@ -537,24 +570,25 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica, diag_raise(); } - struct relay relay; - relay_create(&relay, fd, sync, relay_send_row); - relay.r = recovery_new(cfg_gets("wal_dir"), - cfg_geti("force_recovery"), - replica_clock); - vclock_copy(&relay.tx.vclock, replica_clock); - relay.version_id = replica_version_id; - relay.replica = replica; - replica_set_relay(replica, &relay); - vclock_copy(&relay.local_vclock_at_subscribe, &replicaset.vclock); - - int rc = cord_costart(&relay.cord, tt_sprintf("relay_%p", &relay), - relay_subscribe_f, &relay); + if (relay->state != RELAY_OFF) + relay_destroy(relay); + relay_create(relay, fd, sync, relay_send_row); + relay->r = recovery_new(cfg_gets("wal_dir"), + cfg_geti("force_recovery"), + replica_clock); + vclock_copy(&relay->tx.vclock, replica_clock); + relay->version_id = replica_version_id; + relay->replica = replica; + relay->state = RELAY_FOLLOW; + vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); + + int rc = cord_costart(&relay->cord, tt_sprintf("relay_%p", relay), + relay_subscribe_f, relay); if (rc == 0) - rc = cord_cojoin(&relay.cord); + rc = cord_cojoin(&relay->cord); + assert(replica->relay == relay); replica_clear_relay(replica); - relay_destroy(&relay); if (rc != 0) diag_raise(); diff --git a/src/box/relay.h b/src/box/relay.h index c8a2c2872..e20d4cd13 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -42,6 +42,33 @@ struct replica; struct tt_uuid; struct vclock; +enum relay_state { + /** + * Applier has not connected to the master or not expected. + */ + RELAY_OFF, + /** + * Applier has connected to the master. + */ + RELAY_FOLLOW, + /** + * Applier disconnected from the master. + */ + RELAY_STOPPED, +}; + +struct relay* +relay_new(void); + +void +relay_destroy(struct relay *relay); + +enum relay_state +relay_get_state(const struct relay *relay); + +void +relay_set_state(struct relay *relay, enum relay_state state); + /** * Returns relay's vclock * @param relay relay diff --git a/src/box/replication.cc b/src/box/replication.cc index 9aac1f077..53c2b9351 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -39,6 +39,7 @@ #include "box.h" #include "gc.h" #include "error.h" +#include "relay.h" #include "vclock.h" /* VCLOCK_MAX */ uint32_t instance_id = REPLICA_ID_NIL; @@ -81,8 +82,6 @@ void replication_init(void) { memset(&replicaset, 0, sizeof(replicaset)); - mempool_create(&replicaset.pool, &cord()->slabc, - sizeof(struct replica)); replica_hash_new(&replicaset.hash); rlist_create(&replicaset.anon); vclock_create(&replicaset.vclock); @@ -92,7 +91,6 @@ replication_init(void) void replication_free(void) { - mempool_destroy(&replicaset.pool); fiber_cond_destroy(&replicaset.applier.cond); } @@ -114,8 +112,9 @@ replica_check_id(uint32_t replica_id) static bool replica_is_orphan(struct replica *replica) { + assert(replica->relay != NULL); return replica->id == REPLICA_ID_NIL && replica->applier == NULL && - replica->relay == NULL; + relay_get_state(replica->relay) != RELAY_FOLLOW; } static void @@ -125,14 +124,14 @@ static struct replica * replica_new(void) { struct replica *replica = (struct replica *) - mempool_alloc(&replicaset.pool); + malloc(sizeof(struct replica)); if (replica == NULL) tnt_raise(OutOfMemory, sizeof(*replica), "malloc", "struct replica"); replica->id = 0; replica->uuid = uuid_nil; replica->applier = NULL; - replica->relay = NULL; + replica->relay = relay_new(); replica->gc = NULL; rlist_create(&replica->in_anon); trigger_create(&replica->on_applier_state, @@ -145,9 +144,14 @@ static void replica_delete(struct replica *replica) { assert(replica_is_orphan(replica)); + if (replica->relay != NULL) { + if (relay_get_state(replica->relay) != RELAY_OFF) + relay_destroy(replica->relay); + free(replica->relay); + } if (replica->gc != NULL) gc_consumer_unregister(replica->gc); - mempool_free(&replicaset.pool, replica); + free(replica); } struct replica * @@ -656,19 +660,10 @@ replicaset_check_quorum(void) box_clear_orphan(); } -void -replica_set_relay(struct replica *replica, struct relay *relay) -{ - assert(replica->id != REPLICA_ID_NIL); - assert(replica->relay == NULL); - replica->relay = relay; -} - void replica_clear_relay(struct replica *replica) { - assert(replica->relay != NULL); - replica->relay = NULL; + relay_set_state(replica->relay, RELAY_STOPPED); if (replica_is_orphan(replica)) { replica_hash_remove(&replicaset.hash, replica); replica_delete(replica); diff --git a/src/box/replication.h b/src/box/replication.h index e2bdd814f..2f0a2d7a7 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -37,7 +37,6 @@ #include <small/rb.h> /* replicaset_t */ #include <small/rlist.h> #include "applier.h" -#include <small/mempool.h> #include "fiber_cond.h" #include "vclock.h" @@ -173,8 +172,6 @@ typedef rb_tree(struct replica) replica_hash_t; * relays, usually connected in full mesh. */ struct replicaset { - /** Memory pool for struct replica allocations. */ - struct mempool pool; /** Hash of replicas indexed by UUID. */ replica_hash_t hash; /** @@ -301,14 +298,6 @@ replica_set_id(struct replica *replica, uint32_t id); void replica_clear_id(struct replica *replica); -/** - * Register \a relay of a \a replica. - * \pre a replica can have only one relay - * \pre replica->id != REPLICA_ID_NIL - */ -void -replica_set_relay(struct replica *replica, struct relay *relay); - /** * Unregister \a relay from the \a replica. */ -- 2.14.3 (Apple Git-98)
next prev parent reply other threads:[~2018-05-16 11:32 UTC|newest] Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-05-16 11:32 [tarantool-patches] [PATCH 0/3] replication: improve logging Konstantin Belyavskiy 2018-05-16 11:32 ` [tarantool-patches] [PATCH 1/3] replication: use applier_state to check quorum Konstantin Belyavskiy 2018-05-16 11:32 ` Konstantin Belyavskiy [this message] 2018-05-16 11:32 ` [tarantool-patches] [PATCH 3/3] replication: display downstream status at upstream Konstantin Belyavskiy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=130891313d476e3d5450be9014a905afa72ef08b.1526469555.git.k.belyavskiy@tarantool.org \ --to=k.belyavskiy@tarantool.org \ --cc=georgy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 2/3] replication: do not delete relay on applier disconnect' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox