From: Konstantin Belyavskiy <k.belyavskiy@tarantool.org> To: tarantool-patches@freelists.org, vdavydov@tarantool.org, georgy@tarantool.org, kostja@tarantool.org Subject: [PATCH 1/2] replication: do not delete relay on applier disconnect Date: Mon, 21 May 2018 20:07:17 +0300 [thread overview] Message-ID: <9fa887ca5a8ea2b9d496a7bfd0d4c7a83da26ac8.1526921933.git.k.belyavskiy@tarantool.org> (raw) In-Reply-To: <cover.1526921933.git.k.belyavskiy@tarantool.org> In-Reply-To: <cover.1526921933.git.k.belyavskiy@tarantool.org> This is a part of more complex task aiming to improve logging. Do not destroy relay since it stores last error and it can be useful for diagnostic reason. Now relay is created with replica and always exists. So also remove several NULL checks. Add relay_state { OFF, FOLLOW and STOPPED } to track replica presence, once connected it either FOLLOW or STOPPED until master is reset. Updated with @kostja proposal. Used for #3365. --- src/box/box.cc | 5 +- src/box/lua/info.c | 2 +- src/box/relay.cc | 128 +++++++++++++++++++++++++++++++------------------ src/box/relay.h | 33 +++++++++++-- src/box/replication.cc | 35 +++++++------- src/box/replication.h | 13 +---- 6 files changed, 132 insertions(+), 84 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index eabff1b63..018802f1d 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1401,7 +1401,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * Final stage: feed replica with WALs in range * (start_vclock, stop_vclock). */ - relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock); + relay_final_join(replica, io->fd, header->sync, + &start_vclock, &stop_vclock); say_info("final data sent."); /* Send end of WAL stream marker */ @@ -1493,7 +1494,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) * a stall in updates (in this case replica may hang * indefinitely). */ - relay_subscribe(io->fd, header->sync, replica, &replica_clock, + relay_subscribe(replica, io->fd, header->sync, &replica_clock, replica_version_id); } diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 8e8fd9d97..9dbc3f92c 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -145,7 +145,7 @@ lbox_pushreplica(lua_State *L, struct replica *replica) lua_settable(L, -3); } - if (relay != NULL) { + if (relay_get_state(replica->relay) == RELAY_FOLLOW) { lua_pushstring(L, "downstream"); lbox_pushrelay(L, relay); lua_settable(L, -3); diff --git a/src/box/relay.cc b/src/box/relay.cc index d2ceaf110..6470946ae 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -100,8 +100,6 @@ struct relay { struct replica *replica; /** WAL event watcher. */ struct wal_watcher wal_watcher; - /** Set before exiting the relay loop. */ - bool exiting; /** Relay reader cond. */ struct fiber_cond reader_cond; /** Relay diagnostics. */ @@ -131,6 +129,8 @@ struct relay { struct stailq pending_gc; /** Time when last row was sent to peer. */ double last_row_tm; + /** Relay sync state. */ + enum relay_state state; struct { /* Align to prevent false-sharing with tx thread */ @@ -140,6 +140,12 @@ struct relay { } tx; }; +enum relay_state +relay_get_state(const struct relay *relay) +{ + return relay->state; +} + const struct vclock * relay_vclock(const struct relay *relay) { @@ -153,32 +159,63 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row); static void relay_send_row(struct xstream *stream, struct xrow_header *row); +struct relay * +relay_new(struct replica *replica) +{ + struct relay *relay = (struct relay *) calloc(1, sizeof(struct relay)); + if (relay == NULL) { + diag_set(OutOfMemory, sizeof(struct relay), "malloc", + "struct relay"); + return NULL; + } + relay->replica = replica; + fiber_cond_create(&relay->reader_cond); + diag_create(&relay->diag); + stailq_create(&relay->pending_gc); + relay->state = RELAY_OFF; + return relay; +} + static void -relay_create(struct relay *relay, int fd, uint64_t sync, +relay_start(struct relay *relay, int fd, uint64_t sync, void (*stream_write)(struct xstream *, struct xrow_header *)) { - memset(relay, 0, sizeof(*relay)); xstream_create(&relay->stream, stream_write); + /* + * Clear the diagnostics at start, in case it has the old + * error message which we keep around to display in + * box.info.replication. + */ + diag_clear(&relay->diag); coio_create(&relay->io, fd); relay->sync = sync; - fiber_cond_create(&relay->reader_cond); - diag_create(&relay->diag); - stailq_create(&relay->pending_gc); + relay->state = RELAY_FOLLOW; } static void -relay_destroy(struct relay *relay) +relay_stop(struct relay *relay) { struct relay_gc_msg *gc_msg, *next_gc_msg; stailq_foreach_entry_safe(gc_msg, next_gc_msg, &relay->pending_gc, in_pending) { free(gc_msg); } + stailq_create(&relay->pending_gc); if (relay->r != NULL) recovery_delete(relay->r); + relay->r = NULL; + relay->state = RELAY_STOPPED; +} + +void +relay_delete(struct relay *relay) +{ + if (relay->state == RELAY_FOLLOW) + relay_stop(relay); fiber_cond_destroy(&relay->reader_cond); diag_destroy(&relay->diag); TRASH(relay); + free(relay); } static void @@ -199,11 +236,13 @@ relay_set_cord_name(int fd) void relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) { - struct relay relay; - relay_create(&relay, fd, sync, relay_send_initial_join_row); - assert(relay.stream.write != NULL); - engine_join_xc(vclock, &relay.stream); - relay_destroy(&relay); + struct relay *relay = relay_new(NULL); + if (relay == NULL) + diag_raise(); + relay_start(relay, fd, sync, relay_send_initial_join_row); + engine_join_xc(vclock, &relay->stream); + relay_stop(relay); + relay_delete(relay); } int @@ -222,22 +261,22 @@ relay_final_join_f(va_list ap) } void -relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, - struct vclock *stop_vclock) +relay_final_join(struct replica *replica, int fd, uint64_t sync, + struct vclock *start_vclock, struct vclock *stop_vclock) { - struct relay relay; - relay_create(&relay, fd, sync, relay_send_row); - relay.r = recovery_new(cfg_gets("wal_dir"), + struct relay *relay = replica->relay; + relay_start(relay, fd, sync, relay_send_row); + relay->r = recovery_new(cfg_gets("wal_dir"), cfg_geti("force_recovery"), start_vclock); - vclock_copy(&relay.stop_vclock, stop_vclock); + vclock_copy(&relay->stop_vclock, stop_vclock); - int rc = cord_costart(&relay.cord, "final_join", - relay_final_join_f, &relay); + int rc = cord_costart(&relay->cord, "final_join", + relay_final_join_f, relay); if (rc == 0) - rc = cord_cojoin(&relay.cord); + rc = cord_cojoin(&relay->cord); - relay_destroy(&relay); + relay_stop(relay); if (rc != 0) diag_raise(); @@ -334,7 +373,7 @@ static void relay_process_wal_event(struct wal_watcher *watcher, unsigned events) { struct relay *relay = container_of(watcher, struct relay, wal_watcher); - if (relay->exiting) { + if (relay->state != RELAY_FOLLOW) { /* * Do not try to send anything to the replica * if it already closed its socket. @@ -495,14 +534,13 @@ relay_subscribe_f(va_list ap) if (!fiber_is_dead(reader)) fiber_cancel(reader); fiber_join(reader); - relay->exiting = true; trigger_clear(&on_close_log); wal_clear_watcher(&relay->wal_watcher, cbus_process); cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); cbus_endpoint_destroy(&relay->endpoint, cbus_process); if (!diag_is_empty(&relay->diag)) { - /* An error has occured while ACKs of xlog reading */ + /* An error has occurred while reading ACKs of xlog. */ diag_move(&relay->diag, diag_get()); } struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE); @@ -514,16 +552,16 @@ relay_subscribe_f(va_list ap) /** Replication acceptor fiber handler. */ void -relay_subscribe(int fd, uint64_t sync, struct replica *replica, +relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_clock, uint32_t replica_version_id) { assert(replica->id != REPLICA_ID_NIL); + struct relay *relay = replica->relay; /* Don't allow multiple relays for the same replica */ - if (replica->relay != NULL) { + if (relay->state == RELAY_FOLLOW) { tnt_raise(ClientError, ER_CFG, "replication", "duplicate connection with the same replica UUID"); } - /* * Register the replica with the garbage collector * unless it has already been registered by initial @@ -537,24 +575,21 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica, diag_raise(); } - struct relay relay; - relay_create(&relay, fd, sync, relay_send_row); - relay.r = recovery_new(cfg_gets("wal_dir"), - cfg_geti("force_recovery"), - replica_clock); - vclock_copy(&relay.tx.vclock, replica_clock); - relay.version_id = replica_version_id; - relay.replica = replica; - replica_set_relay(replica, &relay); - vclock_copy(&relay.local_vclock_at_subscribe, &replicaset.vclock); - - int rc = cord_costart(&relay.cord, tt_sprintf("relay_%p", &relay), - relay_subscribe_f, &relay); + relay_start(relay, fd, sync, relay_send_row); + vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); + relay->r = recovery_new(cfg_gets("wal_dir"), + cfg_geti("force_recovery"), + replica_clock); + vclock_copy(&relay->tx.vclock, replica_clock); + relay->version_id = replica_version_id; + + int rc = cord_costart(&relay->cord, tt_sprintf("relay_%p", relay), + relay_subscribe_f, relay); if (rc == 0) - rc = cord_cojoin(&relay.cord); + rc = cord_cojoin(&relay->cord); - replica_clear_relay(replica); - relay_destroy(&relay); + relay_stop(relay); + replica_on_relay_stop(replica); if (rc != 0) diag_raise(); @@ -595,8 +630,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) * it). In the latter case packet's LSN is less than or equal to * local master's LSN at the moment it received 'SUBSCRIBE' request. */ - if (relay->replica == NULL || - packet->replica_id != relay->replica->id || + if (packet->replica_id != relay->replica->id || packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, packet->replica_id)) { relay_send(relay, packet); diff --git a/src/box/relay.h b/src/box/relay.h index c8a2c2872..f039cbef8 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -42,6 +42,33 @@ struct replica; struct tt_uuid; struct vclock; +enum relay_state { + /** + * Applier has not connected to the master or not expected. + */ + RELAY_OFF, + /** + * Applier has connected to the master. + */ + RELAY_FOLLOW, + /** + * Applier disconnected from the master. + */ + RELAY_STOPPED, +}; + +/** Create a relay which is not running. object. */ +struct relay * +relay_new(struct replica *replica); + +/** Destroy and delete the relay */ +void +relay_delete(struct relay *relay); + +/** Return the current state of relay. */ +enum relay_state +relay_get_state(const struct relay *relay); + /** * Returns relay's vclock * @param relay relay @@ -71,8 +98,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock); * @param sync sync from incoming JOIN request */ void -relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, - struct vclock *stop_vclock); +relay_final_join(struct replica *replica, int fd, uint64_t sync, + struct vclock *start_vclock, struct vclock *stop_vclock); /** * Subscribe a replica to updates. @@ -80,7 +107,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, * @return none. */ void -relay_subscribe(int fd, uint64_t sync, struct replica *replica, +relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_vclock, uint32_t replica_version_id); #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */ diff --git a/src/box/replication.cc b/src/box/replication.cc index 9aac1f077..2c5b356d9 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -39,6 +39,7 @@ #include "box.h" #include "gc.h" #include "error.h" +#include "relay.h" #include "vclock.h" /* VCLOCK_MAX */ uint32_t instance_id = REPLICA_ID_NIL; @@ -81,8 +82,6 @@ void replication_init(void) { memset(&replicaset, 0, sizeof(replicaset)); - mempool_create(&replicaset.pool, &cord()->slabc, - sizeof(struct replica)); replica_hash_new(&replicaset.hash); rlist_create(&replicaset.anon); vclock_create(&replicaset.vclock); @@ -92,7 +91,6 @@ replication_init(void) void replication_free(void) { - mempool_destroy(&replicaset.pool); fiber_cond_destroy(&replicaset.applier.cond); } @@ -114,8 +112,9 @@ replica_check_id(uint32_t replica_id) static bool replica_is_orphan(struct replica *replica) { + assert(replica->relay != NULL); return replica->id == REPLICA_ID_NIL && replica->applier == NULL && - replica->relay == NULL; + relay_get_state(replica->relay) != RELAY_FOLLOW; } static void @@ -125,14 +124,19 @@ static struct replica * replica_new(void) { struct replica *replica = (struct replica *) - mempool_alloc(&replicaset.pool); - if (replica == NULL) + malloc(sizeof(struct replica)); + if (replica == NULL) { tnt_raise(OutOfMemory, sizeof(*replica), "malloc", "struct replica"); + } + replica->relay = relay_new(replica); + if (replica->relay == NULL) { + free(replica); + diag_raise(); + } replica->id = 0; replica->uuid = uuid_nil; replica->applier = NULL; - replica->relay = NULL; replica->gc = NULL; rlist_create(&replica->in_anon); trigger_create(&replica->on_applier_state, @@ -145,9 +149,12 @@ static void replica_delete(struct replica *replica) { assert(replica_is_orphan(replica)); + if (replica->relay != NULL) + relay_delete(replica->relay); if (replica->gc != NULL) gc_consumer_unregister(replica->gc); - mempool_free(&replicaset.pool, replica); + TRASH(replica); + free(replica); } struct replica * @@ -657,18 +664,8 @@ replicaset_check_quorum(void) } void -replica_set_relay(struct replica *replica, struct relay *relay) -{ - assert(replica->id != REPLICA_ID_NIL); - assert(replica->relay == NULL); - replica->relay = relay; -} - -void -replica_clear_relay(struct replica *replica) +replica_on_relay_stop(struct replica *replica) { - assert(replica->relay != NULL); - replica->relay = NULL; if (replica_is_orphan(replica)) { replica_hash_remove(&replicaset.hash, replica); replica_delete(replica); diff --git a/src/box/replication.h b/src/box/replication.h index e2bdd814f..c6e5158d4 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -37,7 +37,6 @@ #include <small/rb.h> /* replicaset_t */ #include <small/rlist.h> #include "applier.h" -#include <small/mempool.h> #include "fiber_cond.h" #include "vclock.h" @@ -173,8 +172,6 @@ typedef rb_tree(struct replica) replica_hash_t; * relays, usually connected in full mesh. */ struct replicaset { - /** Memory pool for struct replica allocations. */ - struct mempool pool; /** Hash of replicas indexed by UUID. */ replica_hash_t hash; /** @@ -301,19 +298,11 @@ replica_set_id(struct replica *replica, uint32_t id); void replica_clear_id(struct replica *replica); -/** - * Register \a relay of a \a replica. - * \pre a replica can have only one relay - * \pre replica->id != REPLICA_ID_NIL - */ -void -replica_set_relay(struct replica *replica, struct relay *relay); - /** * Unregister \a relay from the \a replica. */ void -replica_clear_relay(struct replica *replica); +replica_on_relay_stop(struct replica *replica); #if defined(__cplusplus) } /* extern "C" */ -- 2.14.3 (Apple Git-98)
next prev parent reply other threads:[~2018-05-21 17:07 UTC|newest] Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-05-21 17:07 [PATCH 0/2] replication: improve logging Konstantin Belyavskiy 2018-05-21 17:07 ` Konstantin Belyavskiy [this message] 2018-05-21 18:50 ` [PATCH 1/2] replication: do not delete relay on applier disconnect Konstantin Osipov 2018-05-21 17:07 ` [PATCH 2/2] replication: display downstream status at upstream Konstantin Belyavskiy 2018-05-22 7:07 ` [tarantool-patches] " Kirill Yukhin 2018-05-22 10:44 ` [tarantool-patches] Re: [tarantool-patches] " Konstantin Belyavskiy 2018-05-22 11:24 ` Kirill Yukhin 2018-05-29 16:44 ` Konstantin Osipov 2018-05-22 7:08 ` [tarantool-patches] [PATCH 0/2] replication: improve logging Kirill Yukhin
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=9fa887ca5a8ea2b9d496a7bfd0d4c7a83da26ac8.1526921933.git.k.belyavskiy@tarantool.org \ --to=k.belyavskiy@tarantool.org \ --cc=georgy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=vdavydov@tarantool.org \ --subject='Re: [PATCH 1/2] replication: do not delete relay on applier disconnect' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox