[PATCH 2/5] swim: introduce failure detection component
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Mon Dec 17 15:53:20 MSK 2018
Failure detection components allows to find which
members are already dead.
Part of #3234
---
src/lib/swim/swim.c | 484 +++++++++++++++++++++++++++++++++++++++--
test/swim/basic.result | 16 ++
2 files changed, 478 insertions(+), 22 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 7e5d0eb9e..6b2f1ca0c 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -40,17 +40,15 @@
/**
* Possible optimizations:
- * - track hash table versions and do not resend when a received
- * already knows your version.
- * - on small updates send to another node only updates since a
- * version. On rare updates it can dramatically reduce message
- * size and its encoding time.
+ *
+ * Optional:
* - do not send self.
* - cache encoded batch.
* - refute immediately.
* - indirect ping.
* - increment own incarnation on each round.
* - attach dst incarnation to ping.
+ * - fix swim_member_bin mp_encode_map(2) to 3 in the first patch.
*/
/**
@@ -131,24 +129,45 @@ sockaddr_in_hash(const struct sockaddr_in *a)
* tables. The intention to send a data is called IO task and is
* stored in a queue that is dispatched when output is possible.
*/
-typedef void (*swim_io_task_f)(void);
+struct swim_io_task;
+
+typedef void (*swim_io_task_f)(struct swim_io_task *);
struct swim_io_task {
swim_io_task_f cb;
struct rlist in_queue_output;
};
+static inline void
+swim_io_task_create(struct swim_io_task *task, swim_io_task_f cb)
+{
+ task->cb = cb;
+ rlist_create(&task->in_queue_output);
+}
+
+static inline void
+swim_io_task_destroy(struct swim_io_task *task)
+{
+ rlist_del_entry(task, in_queue_output);
+}
+
enum swim_member_status {
/**
* The instance is ok, it responds to requests, sends its
* members table.
*/
MEMBER_ALIVE = 0,
+ /**
+ * The member is considered to be dead. It will disappear
+ * from the membership, if it is not pinned.
+ */
+ MEMBER_DEAD,
swim_member_status_MAX,
};
static const char *swim_member_status_strs[] = {
"alive",
+ "dead",
};
/**
@@ -173,6 +192,38 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * True, if the member is configured explicitly and can
+ * not disappear from the membership.
+ */
+ bool is_pinned;
+ /** Growing number to refute old messages. */
+ uint64_t incarnation;
+ /**
+ * How many pings did not receive an ack in a row. After
+ * a threshold the instance is marked as dead. After more
+ * it is removed from the table (if not pinned).
+ */
+ int failed_pings;
+ /** When the last ping was sent. */
+ double ping_ts;
+ /**
+ * Ready at hand ack task to send when this member has
+ * sent ping to us.
+ */
+ struct swim_io_task ack_task;
+ /**
+ * Ready at hand ping task to send when this member too
+ * long does not respond to an initial ping, piggybacked
+ * with members table.
+ */
+ struct swim_io_task ping_task;
+ /** Position in a queue of members waiting for an ack. */
+ struct rlist in_queue_wait_ack;
};
/**
@@ -188,8 +239,93 @@ static struct swim_member *self = NULL;
*/
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
+ SWIM_FAILURE_DETECTION,
+};
+
+/** {{{ Failure detection component */
+
+/** Possible failure detection keys. */
+enum swim_fd_key {
+ /** Type of the failure detection message: ping or ack. */
+ SWIM_FD_MSG_TYPE,
+ /**
+ * Incarnation of the sender. To make the member alive if
+ * it was considered to be dead, but ping/ack with greater
+ * incarnation was received from it.
+ */
+ SWIM_FD_INCARNATION,
};
+/**
+ * Failure detection message now has only two types: ping or ack.
+ * Indirect ping/ack are todo.
+ */
+enum swim_fd_msg_type {
+ SWIM_FD_MSG_PING,
+ SWIM_FD_MSG_ACK,
+ swim_fd_msg_type_MAX,
+};
+
+static const char *swim_fd_msg_type_strs[] = {
+ "ping",
+ "ack",
+};
+
+/** SWIM failure detection MsgPack header template. */
+struct PACKED swim_fd_header_bin {
+ /** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+ uint8_t k_header;
+ /** mp_encode_map(2) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+ uint8_t k_type;
+ /** mp_encode_uint(enum swim_fd_msg_type) */
+ uint8_t v_type;
+
+ /** mp_encode_uint(SWIM_FD_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type)
+{
+ header->k_header = SWIM_FAILURE_DETECTION;
+ header->m_header = 0x82;
+
+ header->k_type = SWIM_FD_MSG_TYPE;
+ header->v_type = type;
+
+ header->k_incarnation = SWIM_FD_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(self->incarnation);
+}
+
+/**
+ * Members waiting for an ACK. On too long absence of ACK a member
+ * is considered to be dead and is removed. The list is sorted by
+ * time in ascending order (tail is newer, head is older).
+ */
+static RLIST_HEAD(queue_wait_ack);
+/** Generator of ack checking events. */
+static struct ev_periodic wait_ack_tick;
+
+static void
+swim_member_schedule_ack_wait(struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_wait_ack)) {
+ member->ping_ts = fiber_time();
+ rlist_add_tail_entry(&queue_wait_ack, member,
+ in_queue_wait_ack);
+ }
+}
+
+/** }}} Failure detection component */
+
/** {{{ Anti-entropy component */
/**
@@ -204,6 +340,7 @@ enum swim_member_key {
*/
SWIM_MEMBER_ADDR,
SWIM_MEMBER_PORT,
+ SWIM_MEMBER_INCARNATION,
swim_member_key_MAX,
};
@@ -227,7 +364,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
/** SWIM member MsgPack template. */
struct PACKED swim_member_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -246,6 +383,12 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(addr.sin_port) */
uint8_t m_port;
uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
};
static inline void
@@ -255,17 +398,20 @@ swim_member_bin_reset(struct swim_member_bin *header,
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
}
static inline void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x83;
+ header->m_header = 0x84;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDR;
header->m_addr = 0xce;
header->k_port = SWIM_MEMBER_PORT;
header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
}
/**
@@ -283,7 +429,7 @@ static struct ev_periodic round_tick;
* objects and related to them only.
*/
static void
-swim_send_round_msg(void);
+swim_send_round_msg(struct swim_io_task *task);
static struct swim_io_task round_step_task = {
/* .cb = */ swim_send_round_msg,
@@ -295,11 +441,19 @@ static struct swim_io_task round_step_task = {
/**
* SWIM message structure:
* {
+ * SWIM_FAILURE_DETECTION: {
+ * SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type,
+ * SWIM_FD_INCARNATION: uint
+ * },
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
* SWIM_MEMBER_ADDR: uint, ip,
- * SWIM_MEMBER_PORT: uint, port
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
* },
* ...
* ],
@@ -317,6 +471,22 @@ enum {
* networks by their admins.
*/
UDP_PACKET_SIZE = 1472,
+ /**
+ * If a ping was sent, it is considered to be lost after
+ * this time without an ack.
+ */
+ ACK_TIMEOUT = 1,
+ /**
+ * If a member has not been responding to pings this
+ * number of times, it is considered to be dead.
+ */
+ NO_ACKS_TO_DEAD = 3,
+ /**
+ * If a not pinned member confirmed to be dead, it is
+ * removed from the membership after at least this number
+ * of failed pings.
+ */
+ NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
};
/**
@@ -330,6 +500,14 @@ static struct evio_service input;
*/
static struct ev_io output;
+/**
+ * An array of configured members. Used only to easy rollback a
+ * failed reconfiguration.
+ */
+static struct swim_member **cfg = NULL;
+/** Number of configured members. */
+static int cfg_size = 0;
+
/**
* An array of members shuffled on each round. Its head it sent
* to each member during one round as an anti-entropy message.
@@ -347,12 +525,44 @@ swim_io_task_push(struct swim_io_task *task)
ev_io_start(loop(), &output);
}
+static void
+swim_send_ack(struct swim_io_task *task);
+
+static void
+swim_send_ping(struct swim_io_task *task);
+
+/**
+ * Update status of the member if needed. Statuses are compared as
+ * a compound key: {incarnation, status}. So @a new_status can
+ * override an old one only if its incarnation is greater, or the
+ * same, but its status is "bigger". Statuses are compared by
+ * their identifier, so "alive" < "dead". This protects from the
+ * case when a member is detected as dead on one instance, but
+ * overriden by another instance with the same incarnation "alive"
+ * message.
+ */
+static inline void
+swim_member_update_status(struct swim_member *member,
+ enum swim_member_status new_status,
+ uint64_t incarnation)
+{
+ assert(member != self);
+ if (member->incarnation == incarnation) {
+ if (member->status < new_status)
+ member->status = new_status;
+ } else if (member->incarnation < incarnation) {
+ member->status = new_status;
+ member->incarnation = incarnation;
+ }
+}
+
/**
* Register a new member with a specified status. Here it is
* added to the hash, to the 'next' queue.
*/
static struct swim_member *
-swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status)
+swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
+ uint64_t incarnation)
{
struct swim_member *member =
(struct swim_member *) malloc(sizeof(*member));
@@ -362,6 +572,10 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status)
}
member->status = status;
member->addr = *addr;
+ member->incarnation = incarnation;
+ member->is_pinned = false;
+ member->failed_pings = 0;
+ member->ping_ts = 0;
struct mh_i64ptr_node_t node;
node.key = sockaddr_in_hash(addr);
node.val = member;
@@ -371,7 +585,10 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status)
diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
return NULL;
}
+ swim_io_task_create(&member->ack_task, swim_send_ack);
+ swim_io_task_create(&member->ping_task, swim_send_ping);
rlist_add_entry(&queue_round, member, in_queue_round);
+ rlist_create(&member->in_queue_wait_ack);
return member;
}
@@ -396,7 +613,10 @@ swim_member_delete(struct swim_member *member)
mh_int_t rc = mh_i64ptr_find(members, key, NULL);
assert(rc != mh_end(members));
mh_i64ptr_del(members, rc, NULL);
+ swim_io_task_destroy(&member->ack_task);
+ swim_io_task_destroy(&member->ping_task);
rlist_del_entry(member, in_queue_round);
+ rlist_del_entry(member, in_queue_wait_ack);
free(member);
}
@@ -472,16 +692,22 @@ swim_encode_round_msg(char *buffer, int size)
if ((shuffled_members == NULL || rlist_empty(&queue_round)) &&
swim_new_round() != 0)
return -1;
- /* 1 - for the root map header. */
- assert(size > 1);
- --size;
+ /* -1 - for the root map header. */
+ assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
+ size -= sizeof(struct swim_fd_header_bin) + 1;
+
int ae_batch_size = calculate_bin_batch_size(
sizeof(struct swim_anti_entropy_header_bin),
sizeof(struct swim_member_bin), size);
if (ae_batch_size > shuffled_members_size)
ae_batch_size = shuffled_members_size;
- buffer = mp_encode_map(buffer, 1);
+ buffer = mp_encode_map(buffer, 2);
+
+ struct swim_fd_header_bin fd_header_bin;
+ swim_fd_header_bin_create(&fd_header_bin, SWIM_FD_MSG_PING);
+ memcpy(buffer, &fd_header_bin, sizeof(fd_header_bin));
+ buffer += sizeof(fd_header_bin);
struct swim_anti_entropy_header_bin ae_header_bin;
swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
@@ -504,8 +730,10 @@ swim_encode_round_msg(char *buffer, int size)
* from the queue.
*/
static void
-swim_send_round_msg(void)
+swim_send_round_msg(struct swim_io_task *task)
{
+ (void) task;
+ assert(task->cb == swim_send_round_msg);
char buffer[UDP_PACKET_SIZE];
int size = swim_encode_round_msg(buffer, UDP_PACKET_SIZE);
if (size < 0) {
@@ -524,11 +752,51 @@ swim_send_round_msg(void)
if (sio_sendto(output.fd, buffer, size, 0, (struct sockaddr *) &m->addr,
sizeof(m->addr)) == -1 && ! sio_wouldblock(errno))
diag_log();
+ swim_member_schedule_ack_wait(m);
rlist_del_entry(m, in_queue_round);
end:
ev_periodic_start(loop(), &round_tick);
}
+/** Send a failure detection message. */
+static void
+swim_send_fd_message(struct swim_member *m, enum swim_fd_msg_type type)
+{
+ char buffer[UDP_PACKET_SIZE];
+ char *pos = mp_encode_map(buffer, 1);
+ struct swim_fd_header_bin header_bin;
+ swim_fd_header_bin_create(&header_bin, type);
+ memcpy(pos, &header_bin, sizeof(header_bin));
+ pos += sizeof(header_bin);
+ assert(pos - buffer <= (int)sizeof(buffer));
+ say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type],
+ sio_strfaddr((struct sockaddr *) &m->addr,
+ sizeof(m->addr)));
+ if (sio_sendto(output.fd, buffer, pos - buffer, 0,
+ (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1 &&
+ ! sio_wouldblock(errno))
+ diag_log();
+}
+
+static void
+swim_send_ack(struct swim_io_task *task)
+{
+ assert(task->cb == swim_send_ack);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ack_task);
+ swim_send_fd_message(m, SWIM_FD_MSG_ACK);
+}
+
+static void
+swim_send_ping(struct swim_io_task *task)
+{
+ assert(task->cb == swim_send_ping);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ping_task);
+ swim_send_fd_message(m, SWIM_FD_MSG_PING);
+ swim_member_schedule_ack_wait(m);
+}
+
static void
swim_on_output(struct ev_loop *loop, struct ev_io *io, int events)
{
@@ -541,7 +809,7 @@ swim_on_output(struct ev_loop *loop, struct ev_io *io, int events)
struct swim_io_task *task =
rlist_shift_entry(&queue_output, struct swim_io_task,
in_queue_output);
- task->cb();
+ task->cb(task);
}
/** Once per specified timeout trigger a next broadcast step. */
@@ -554,12 +822,42 @@ swim_trigger_round_step(struct ev_loop *loop, struct ev_periodic *p, int events)
ev_periodic_stop(loop, p);
}
+/**
+ * Check for failed pings. A ping is failed if an ack was not
+ * received during ACK_TIMEOUT. A failed ping is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) loop;
+ (void) p;
+ (void) events;
+ struct swim_member *m, *tmp;
+ double current_time = fiber_time();
+ rlist_foreach_entry_safe(m, &queue_wait_ack, in_queue_wait_ack, tmp) {
+ if (current_time - m->ping_ts < ACK_TIMEOUT)
+ break;
+ ++m->failed_pings;
+ if (m->failed_pings >= NO_ACKS_TO_GC) {
+ if (!m->is_pinned)
+ swim_member_delete(m);
+ continue;
+ }
+ if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ m->status = MEMBER_DEAD;
+ swim_io_task_push(&m->ping_task);
+ rlist_del_entry(m, in_queue_wait_ack);
+ }
+}
+
/**
* SWIM member attributes from anti-entropy and dissemination
* messages.
*/
struct swim_member_def {
struct sockaddr_in addr;
+ uint64_t incarnation;
enum swim_member_status status;
};
@@ -569,6 +867,7 @@ swim_member_def_create(struct swim_member_def *def)
def->addr.sin_port = 0;
def->addr.sin_addr.s_addr = 0;
def->status = MEMBER_ALIVE;
+ def->incarnation = 0;
}
static void
@@ -580,9 +879,34 @@ swim_process_member_update(struct swim_member_def *def)
* members table.
*/
if (member == NULL) {
- member = swim_member_new(&def->addr, def->status);
+ member = swim_member_new(&def->addr, def->status,
+ def->incarnation);
if (member == NULL)
diag_log();
+ return;
+ }
+ if (member != self) {
+ swim_member_update_status(member, def->status,
+ def->incarnation);
+ return;
+ }
+ /*
+ * It is possible that other instances know a bigger
+ * incarnation of this instance - such thing happens when
+ * the instance restarts and loses its local incarnation
+ * number. It will be restored by receiving dissemination
+ * messages about self.
+ */
+ if (self->incarnation < def->incarnation)
+ self->incarnation = def->incarnation;
+ if (def->status != MEMBER_ALIVE &&
+ def->incarnation == self->incarnation) {
+ /*
+ * In the cluster a gossip exists that this
+ * instance is not alive. Refute this information
+ * with a bigger incarnation.
+ */
+ self->incarnation++;
}
}
@@ -626,6 +950,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
}
def->addr.sin_port = port;
break;
+ case SWIM_MEMBER_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ def->incarnation = mp_decode_uint(pos);
+ break;
default:
unreachable();
}
@@ -677,6 +1010,90 @@ swim_process_anti_entropy(const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a failure detection message. Schedule pings, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(const char **pos, const char *end,
+ const struct sockaddr_in *src)
+{
+ const char *msg_pref = "Invalid SWIM failure detection message:";
+ if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) {
+ say_error("%s root should be a map", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_map(pos);
+ if (size != 2) {
+ say_error("%s root map should have two keys - message type "\
+ "and incarnation", msg_pref);
+ return -1;
+ }
+ enum swim_fd_msg_type type = swim_fd_msg_type_MAX;
+ uint64_t incarnation = 0;
+ for (int i = 0; i < (int) size; ++i) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s a key should be uint", msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ switch(key) {
+ case SWIM_FD_MSG_TYPE:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s message type should be uint",
+ msg_pref);
+ return -1;
+ }
+ key = mp_decode_uint(pos);
+ if (key >= swim_fd_msg_type_MAX) {
+ say_error("%s unknown message type", msg_pref);
+ return -1;
+ }
+ type = key;
+ break;
+ case SWIM_FD_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ incarnation = mp_decode_uint(pos);
+ break;
+ default:
+ say_error("%s unknown key", msg_pref);
+ return -1;
+ }
+ }
+ if (type == swim_fd_msg_type_MAX) {
+ say_error("%s message type should be specified", msg_pref);
+ return -1;
+ }
+ struct swim_member *sender = swim_find_member(src);
+ if (sender == NULL) {
+ sender = swim_member_new(src, MEMBER_ALIVE, incarnation);
+ if (sender == NULL) {
+ diag_log();
+ return 0;
+ }
+ } else {
+ swim_member_update_status(sender, MEMBER_ALIVE, incarnation);
+ }
+ if (type == SWIM_FD_MSG_PING) {
+ swim_io_task_push(&sender->ack_task);
+ } else {
+ assert(type == SWIM_FD_MSG_ACK);
+ if (incarnation >= sender->incarnation) {
+ sender->failed_pings = 0;
+ rlist_del_entry(&sender->ping_task, in_queue_output);
+ rlist_del_entry(sender, in_queue_wait_ack);
+ }
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -717,6 +1134,12 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
if (swim_process_anti_entropy(&pos, end) != 0)
return;
break;
+ case SWIM_FAILURE_DETECTION:
+ say_verbose("SWIM: process failure detection");
+ if (swim_process_failure_detection(&pos, end,
+ &addr) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -783,7 +1206,9 @@ swim_init(void)
swim_init_input(&input);
ev_init(&output, swim_on_output);
ev_init(&round_tick, swim_trigger_round_step);
+ ev_init(&wait_ack_tick, swim_check_acks);
ev_periodic_set(&round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL);
+ ev_periodic_set(&wait_ack_tick, 0, ACK_TIMEOUT, NULL);
return 0;
}
@@ -811,7 +1236,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
goto error;
struct swim_member *member = swim_find_member(&addr);
if (member == NULL) {
- member = swim_member_new(&addr, new_status);
+ member = swim_member_new(&addr, new_status, 0);
if (member == NULL)
goto error;
}
@@ -832,7 +1257,8 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
new_self = swim_find_member(&addr);
if (new_self == NULL) {
- new_self = swim_member_new(&addr, new_status);
+ new_self = swim_member_new(&addr, new_status,
+ 0);
if (new_self == NULL)
goto error;
}
@@ -847,6 +1273,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
ev_io_set(&output, input.ev.fd, EV_WRITE);
evio_service_start(&input);
ev_periodic_start(loop(), &round_tick);
+ ev_periodic_start(loop(), &wait_ack_tick);
}
}
@@ -854,9 +1281,15 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
ev_periodic_set(&round_tick, 0, heartbeat_rate, NULL);
if (member_uri_count > 0) {
- for (int i = 0; i < new_cfg_size; ++i)
+ for (int i = 0; i < cfg_size; ++i)
+ cfg[i]->is_pinned = false;
+ free(cfg);
+ for (int i = 0; i < new_cfg_size; ++i) {
+ new_cfg[i]->is_pinned = true;
new_cfg[i]->status = MEMBER_ALIVE;
- free(new_cfg);
+ }
+ cfg = new_cfg;
+ cfg_size = new_cfg_size;
}
if (new_self != NULL && new_self->status == new_status)
new_self->status = MEMBER_ALIVE;
@@ -893,6 +1326,8 @@ swim_info(struct info_handler *info)
sizeof(member->addr)));
info_append_str(info, "status",
swim_member_status_strs[member->status]);
+ info_append_int(info, "incarnation",
+ (int64_t) member->incarnation);
info_table_end(info);
}
info_end(info);
@@ -906,6 +1341,7 @@ swim_stop(void)
ev_io_stop(loop(), &output);
evio_service_stop(&input);
ev_periodic_stop(loop(), &round_tick);
+ ev_periodic_stop(loop(), &wait_ack_tick);
mh_int_t node = mh_first(members);
while (node != mh_end(members)) {
struct swim_member *m = (struct swim_member *)
@@ -915,10 +1351,14 @@ swim_stop(void)
}
mh_i64ptr_delete(members);
free(shuffled_members);
+ free(cfg);
members = NULL;
+ cfg = NULL;
+ cfg_size = 0;
shuffled_members = NULL;
shuffled_members_size = 0;
+ rlist_create(&queue_wait_ack);
rlist_create(&queue_output);
rlist_create(&queue_round);
}
diff --git a/test/swim/basic.result b/test/swim/basic.result
index a7ae140d6..7d0131606 100644
--- a/test/swim/basic.result
+++ b/test/swim/basic.result
@@ -43,10 +43,13 @@ swim_info_sorted()
---
- - - 192.168.0.1:3333
- status: alive
+ incarnation: 0
- - 192.168.0.2:3333
- status: alive
+ incarnation: 0
- - 192.168.0.3:3333
- status: alive
+ incarnation: 0
...
swim.stop()
---
@@ -63,12 +66,16 @@ swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- status: alive
+ incarnation: 0
- - 192.168.0.1:3333
- status: alive
+ incarnation: 0
- - 192.168.0.2:3333
- status: alive
+ incarnation: 0
- - 192.168.0.3:3333
- status: alive
+ incarnation: 0
...
swim.debug_round_step()
---
@@ -77,12 +84,16 @@ swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- status: alive
+ incarnation: 0
- - 192.168.0.1:3333
- status: alive
+ incarnation: 0
- - 192.168.0.2:3333
- status: alive
+ incarnation: 0
- - 192.168.0.3:3333
- status: alive
+ incarnation: 0
...
swim.stop()
---
@@ -123,6 +134,7 @@ swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- status: alive
+ incarnation: 0
...
swim.stop()
---
@@ -163,12 +175,16 @@ swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- status: alive
+ incarnation: 0
- - 192.168.0.1:3333
- status: alive
+ incarnation: 0
- - 192.168.0.2:3333
- status: alive
+ incarnation: 0
- - 192.168.0.3:3333
- status: alive
+ incarnation: 0
...
swim.stop()
---
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list