[tarantool-patches] Re: [PATCH 1/1] swim: introduce SWIM's anti-entropy component
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Mar 7 15:33:53 MSK 2019
On 07/03/2019 13:27, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [19/03/07 11:19]:
>> Sorry again - ignore the whole email. I've found a bug
>> which required drastic changes in the event loop
>> implementation.
>
> Please submit the fix in a follow up patch. I will not review the
> entire several thousands of lines once again.
As I said, I changed mostly swim_test_transport.c and swim_test_ev.c.
Swim.c totally adds 21 lines. It is not about new 1000 lines.
But as you wish. I attached a diff below.
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 19d7935b0..4b401800a 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -175,6 +175,14 @@ swim_uuid_str(const struct tt_uuid *uuid)
return buf;
}
+/** Check if two AF_INET addresses are equal. */
+static bool
+swim_sockaddr_in_eq(const struct sockaddr_in *a1, const struct sockaddr_in *a2)
+{
+ return a1->sin_port == a2->sin_port &&
+ a1->sin_addr.s_addr == a2->sin_addr.s_addr;
+}
+
/**
* A cluster member description. This structure describes the
* last known state of an instance. This state is updated
@@ -268,20 +276,18 @@
-/**
- * SWIM fd mainly is needed to be printed into the logs in order
- * to distinguish between different SWIM instances logs.
- */
-static inline int
+int
swim_fd(const struct swim *swim)
{
return swim->scheduler.transport.fd;
@@ -575,8 +579,17 @@ swim_complete_step(struct swim_task *task,
(void) task;
struct swim *swim = swim_by_scheduler(scheduler);
swim_ev_timer_start(loop(), &swim->round_tick);
- rlist_shift_entry(&swim->round_queue, struct swim_member,
- in_round_queue);
+ /*
+ * It is possible that the original member was deleted
+ * manually during the task execution.
+ */
+ if (rlist_empty(&swim->round_queue))
+ return;
+ struct swim_member *m =
+ rlist_first_entry(&swim->round_queue, struct swim_member,
+ in_round_queue);
+ if (swim_sockaddr_in_eq(&m->addr, &task->dst))
+ rlist_shift(&swim->round_queue);
}
/**
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 8b82df051..744214371 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -80,6 +80,14 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
void
swim_delete(struct swim *swim);
+/**
+ * SWIM fd mainly is needed to be printed into the logs in order
+ * to distinguish between different SWIM instances logs. And for
+ * unit testing.
+ */
+int
+swim_fd(const struct swim *swim);
+
/** Add a new member. */
int
swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid);
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 78a69b7c6..ebb9418d4 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -149,7 +149,7 @@ swim_test_cfg(void)
static void
swim_test_add_remove(void)
{
- swim_start_test(12);
+ swim_start_test(13);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_add_link(cluster, 0, 1);
@@ -189,6 +189,18 @@ swim_test_add_remove(void)
"after removal the cluster is not in fullmesh");
is(swim_cluster_wait_fullmesh(cluster, 1), 0,
"but it is back in 1 step");
+
+ /*
+ * On each step s1 sends itself to s2. However s2 can be
+ * removed from s1 after the message is scheduled but
+ * before its completion.
+ */
+ swim_cluster_block_io(cluster, 0, 2);
+ swim_do_loop_step(loop());
+ swim_remove_member(s1, swim_member_uuid(s2_self));
+ is(swim_cluster_wait_fullmesh(cluster, 1), 0,
+ "back in fullmesh after a member removal in the middle of a step");
+
swim_cluster_delete(cluster);
swim_finish_test();
@@ -200,7 +212,6 @@ main_f(va_list ap)
swim_start_test(5);
(void) ap;
- struct ev_loop *loop = loop();
swim_test_ev_init();
swim_test_transport_init();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 73e1504a3..e71d6cfc2 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -38,7 +38,7 @@ ok 3 - subtests
ok 4 - subtests
*** swim_test_cfg: done ***
*** swim_test_add_remove ***
- 1..12
+ 1..13
ok 1 - can not add an existing member
ok 2 - diag says 'already exists'
ok 3 - can not add a invalid uri
@@ -51,6 +51,7 @@ ok 4 - subtests
ok 10 - remove of a not existing member
ok 11 - after removal the cluster is not in fullmesh
ok 12 - but it is back in 1 step
+ ok 13 - back in fullmesh after a member removal in the middle of a step
ok 5 - subtests
*** swim_test_add_remove: done ***
*** main_f: done ***
diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c
index b70a8b9c8..950784aec 100644
--- a/test/unit/swim_test_ev.c
+++ b/test/unit/swim_test_ev.c
@@ -49,31 +49,42 @@ static double watch = 0;
static int event_id = 0;
/**
- * Timed event's description and attributes used to process it.
- * Note, that IO events are not wrapped with that and processed
- * separately in test transport module.
+ * SWIM testing event loop has two event types - natural libev
+ * events like timer, and artificial like fake socket blocking.
*/
-struct swim_test_event {
- /**
- * Event mask with which libev watcher's callback should
- * be invoked on that event.
- */
- int revents;
- /**
- * Libev watcher. Used to store callback and to find the
- * event by watcher pointer. It is necessary because SWIM
- * operates by libev watchers.
- */
- struct ev_watcher *watcher;
+enum swim_event_type {
+ SWIM_EVENT_TIMER,
+ SWIM_EVENT_FD_UNBLOCK,
+};
+
+struct swim_event;
+
+typedef void (*swim_event_process_f)(struct swim_event *, struct ev_loop *);
+typedef void (*swim_event_delete_f)(struct swim_event *);
+
+/**
+ * Base event. It is stored in the event heap and virtualizes
+ * other events.
+ */
+struct swim_event {
+ /** Type, for assertions only. */
+ enum swim_event_type type;
/**
* When that event should be invoked according to the fake
* watch.
*/
double deadline;
- /** A link in the events heap. */
- struct heap_node in_events_heap;
+ /** A link in the event heap. */
+ struct heap_node in_event_heap;
/** ID to sort events with the same deadline. */
int id;
+ /**
+ * Process the event. Usually the event is deleted right
+ * after that.
+ */
+ swim_event_process_f process;
+ /** Just delete the event. Called on event heap reset. */
+ swim_event_delete_f delete;
};
/**
@@ -81,68 +92,162 @@ struct swim_test_event {
* deadline and the smallest ID in that deadline.
*/
static inline bool
-swim_test_event_less(const struct swim_test_event *e1,
- const struct swim_test_event *e2)
+swim_event_less(const struct swim_event *e1, const struct swim_event *e2)
{
if (e1->deadline == e2->deadline)
return e1->id < e2->id;
return e1->deadline < e2->deadline;
}
-#define HEAP_NAME events_heap
-#define HEAP_LESS(h, e1, e2) swim_test_event_less(e1, e2)
-#define heap_value_t struct swim_test_event
-#define heap_value_attr in_events_heap
+#define HEAP_NAME event_heap
+#define HEAP_LESS(h, e1, e2) swim_event_less(e1, e2)
+#define heap_value_t struct swim_event
+#define heap_value_attr in_event_heap
#include "salad/heap.h"
-/** Events heap. Event loop pops them from here. */
-static heap_t events_heap;
+/** Event heap. Event loop pops them from here. */
+static heap_t event_heap;
-/** Libev watcher is matched to exactly one test event here. */
+/** Libev watcher is matched to exactly one event here. */
static struct mh_i64ptr_t *events_hash;
/**
- * Create a new event which should happen after @a delay and with
- * the specified events mask on behalf of @a watcher.
+ * Create a new event which should call @a process after @a delay
+ * fake seconds. @A delete is called explicitly when the event
+ * is deleted by SWIM explicitly, and when the event heap is
+ * reset.
*/
static void
-swim_test_event_new(struct ev_watcher *watcher, double delay, int revents)
+swim_event_create(struct swim_event *e, enum swim_event_type type, double delay,
+ swim_event_process_f process, swim_event_delete_f delete)
{
- struct swim_test_event *e =
- (struct swim_test_event *) malloc(sizeof(*e));
- assert(e != NULL);
- e->watcher = watcher;
e->deadline = swim_time() + delay;
- e->revents = revents;
e->id = event_id++;
- events_heap_insert(&events_heap, e);
- struct mh_i64ptr_node_t old = {0, NULL}, *old_p = &old;
- struct mh_i64ptr_node_t node = {(uint64_t) watcher, e};
- mh_int_t rc = mh_i64ptr_put(events_hash, &node, &old_p, NULL);
- (void) rc;
- assert(rc != mh_end(events_hash));
- assert(old.val == NULL && old.key == 0);
+ e->process = process;
+ e->delete = delete;
+ e->type = type;
+ event_heap_insert(&event_heap, e);
}
-/** Delete event, free resources. */
+/** Destroy a basic event. */
static inline void
-swim_test_event_delete(struct swim_test_event *e)
+swim_event_destroy(struct swim_event *e)
{
- events_heap_delete(&events_heap, e);
- mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) e->watcher, NULL);
- assert(rc != mh_end(events_hash));
- mh_i64ptr_del(events_hash, rc, NULL);
- free(e);
+ event_heap_delete(&event_heap, e);
+}
+
+/** Destroy a event and free its resources. */
+static inline void
+swim_event_delete(struct swim_event *e)
+{
+ e->delete(e);
}
/** Find an event by @a watcher. */
-static struct swim_test_event *
-swim_test_event_by_ev(struct ev_watcher *watcher)
+static struct swim_event *
+swim_event_by_ev(struct ev_watcher *watcher)
{
mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) watcher, NULL);
if (rc == mh_end(events_hash))
return NULL;
- return (struct swim_test_event *) mh_i64ptr_node(events_hash, rc)->val;
+ return (struct swim_event *) mh_i64ptr_node(events_hash, rc)->val;
+}
+
+/** Timer event generated by libev. */
+struct swim_timer_event {
+ struct swim_event base;
+ /**
+ * Libev watcher. Used to store callback and to find the
+ * event by watcher pointer. It is necessary because SWIM
+ * operates by libev watchers.
+ */
+ struct ev_watcher *watcher;
+};
+
+/** Destroy a timer event and free its resources. */
+static void
+swim_timer_event_delete(struct swim_event *e)
+{
+ assert(e->type == SWIM_EVENT_TIMER);
+ struct swim_timer_event *te = (struct swim_timer_event *) e;
+ mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) te->watcher, NULL);
+ assert(rc != mh_end(events_hash));
+ mh_i64ptr_del(events_hash, rc, NULL);
+ swim_event_destroy(e);
+ free(te);
+}
+
+/** Process a timer event and delete it. */
+static void
+swim_timer_event_process(struct swim_event *e, struct ev_loop *loop)
+{
+ assert(e->type == SWIM_EVENT_TIMER);
+ struct ev_watcher *w = ((struct swim_timer_event *) e)->watcher;
+ swim_timer_event_delete(e);
+ ev_invoke(loop, w, EV_TIMER);
+}
+
+/** Create a new timer event. */
+static void
+swim_timer_event_new(struct ev_watcher *watcher, double delay)
+{
+ struct swim_timer_event *e =
+ (struct swim_timer_event *) malloc(sizeof(*e));
+ assert(e != NULL);
+ swim_event_create(&e->base, SWIM_EVENT_TIMER, delay,
+ swim_timer_event_process, swim_timer_event_delete);
+ e->watcher = watcher;
+ assert(swim_event_by_ev(watcher) == NULL);
+ struct mh_i64ptr_node_t node = {(uint64_t) watcher, e};
+ mh_int_t rc = mh_i64ptr_put(events_hash, &node, NULL, NULL);
+ (void) rc;
+ assert(rc != mh_end(events_hash));
+}
+
+/**
+ * SWIM fake transport's event. It is used to block a fake file
+ * descriptor for a delay. Right after a block that event is
+ * generated to unblock the descriptor later.
+ */
+struct swim_fd_unblock_event {
+ struct swim_event base;
+ /** A fake file descriptor to unlock. */
+ int fd;
+};
+
+/** Delete a fd unblock event. */
+static void
+swim_fd_unblock_event_delete(struct swim_event *e)
+{
+ assert(e->type == SWIM_EVENT_FD_UNBLOCK);
+ swim_event_destroy(e);
+ free(e);
+}
+
+/** Process and delete a fd unblock event. */
+static void
+swim_fd_unblock_event_process(struct swim_event *e, struct ev_loop *loop)
+{
+ (void) loop;
+ assert(e->type == SWIM_EVENT_FD_UNBLOCK);
+ struct swim_fd_unblock_event *fe = (struct swim_fd_unblock_event *) e;
+ swim_test_transport_unblock_fd(fe->fd);
+ swim_fd_unblock_event_delete(e);
+}
+
+void
+swim_test_ev_block_fd(int fd, double delay)
+{
+ struct swim_fd_unblock_event *e =
+ (struct swim_fd_unblock_event *) malloc(sizeof(*e));
+ assert(e != NULL);
+ /* Block now. */
+ swim_test_transport_block_fd(fd);
+ /* Unblock after delay. */
+ swim_event_create(&e->base, SWIM_EVENT_FD_UNBLOCK, delay,
+ swim_fd_unblock_event_process,
+ swim_fd_unblock_event_delete);
+ e->fd = fd;
}
/** Implementation of global time visible in SWIM. */
@@ -159,24 +264,25 @@ swim_time(void)
void
swim_ev_timer_start(struct ev_loop *loop, struct ev_timer *base)
{
- if (swim_test_event_by_ev((struct ev_watcher *) base) != NULL)
+ (void) loop;
+ if (swim_event_by_ev((struct ev_watcher *) base) != NULL)
return;
/* Create the periodic watcher and one event. */
- swim_test_event_new((struct ev_watcher *) base, base->at, EV_TIMER);
+ swim_timer_event_new((struct ev_watcher *) base, base->at);
}
/** Time stop cancels the event if the timer is active. */
void
swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *base)
{
+ (void) loop;
/*
* Delete the watcher and its events. Should be only one.
*/
- struct swim_test_event *e =
- swim_test_event_by_ev((struct ev_watcher *) base);
+ struct swim_event *e = swim_event_by_ev((struct ev_watcher *) base);
if (e == NULL)
return;
- swim_test_event_delete(e);
+ swim_event_delete(e);
}
/** Process all the events with the next nearest deadline. */
@@ -184,17 +290,16 @@ void
swim_do_loop_step(struct ev_loop *loop)
{
say_verbose("Loop watch %f", watch);
- struct swim_test_event *e = events_heap_top(&events_heap);
+ struct swim_event *next_e, *e = event_heap_top(&event_heap);
if (e != NULL) {
assert(e->deadline >= watch);
/* Multiple events can have the same deadline. */
watch = e->deadline;
do {
- int revents = e->revents;
- struct ev_watcher *w = e->watcher;
- swim_test_event_delete(e);
- ev_invoke(loop, w, revents);
- e = events_heap_top(&events_heap);
+ e->process(e, loop);
+ next_e = event_heap_top(&event_heap);
+ assert(e != next_e);
+ e = next_e;
} while (e != NULL && e->deadline == watch);
}
/*
@@ -227,9 +332,9 @@ swim_do_loop_step(struct ev_loop *loop)
void
swim_test_ev_reset(void)
{
- struct swim_test_event *e;
- while ((e = events_heap_top(&events_heap)) != NULL)
- swim_test_event_delete(e);
+ struct swim_event *e;
+ while ((e = event_heap_top(&event_heap)) != NULL)
+ swim_event_delete(e);
assert(mh_size(events_hash) == 0);
event_id = 0;
watch = 0;
@@ -240,13 +345,13 @@ swim_test_ev_init(void)
{
events_hash = mh_i64ptr_new();
assert(events_hash != NULL);
- events_heap_create(&events_heap);
+ event_heap_create(&event_heap);
}
void
swim_test_ev_free(void)
{
swim_test_ev_reset();
- events_heap_destroy(&events_heap);
+ event_heap_destroy(&event_heap);
mh_i64ptr_delete(events_hash);
}
diff --git a/test/unit/swim_test_ev.h b/test/unit/swim_test_ev.h
index 788ac06e9..808bc510e 100644
--- a/test/unit/swim_test_ev.h
+++ b/test/unit/swim_test_ev.h
@@ -47,6 +47,10 @@ swim_test_ev_init(void);
void
swim_test_ev_free(void);
+/** Block a file descriptor @a fd for @a delay fake seconds. */
+void
+swim_test_ev_block_fd(int fd, double delay);
+
/** Play one step of event loop, process generated events. */
void
swim_do_loop_step(struct ev_loop *loop);
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index d49b92885..ee50e3922 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -225,6 +225,22 @@ swim_transport_create(struct swim_transport *transport)
memset(&transport->addr, 0, sizeof(transport->addr));
}
+void
+swim_test_transport_block_fd(int fd)
+{
+ struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
+ assert(! rlist_empty(&sfd->in_opened));
+ rlist_del_entry(sfd, in_opened);
+}
+
+void
+swim_test_transport_unblock_fd(int fd)
+{
+ struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
+ assert(rlist_empty(&sfd->in_opened));
+ rlist_add_tail_entry(&swim_fd_opened, sfd, in_opened);
+}
+
/** Send one packet to destination's recv queue. */
static inline void
swim_fd_send_packet(struct swim_fd *fd)
diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h
index c9739e250..f8066e636 100644
--- a/test/unit/swim_test_transport.h
+++ b/test/unit/swim_test_transport.h
@@ -47,6 +47,17 @@ struct ev_loop;
void
swim_transport_do_loop_step(struct ev_loop *loop);
+/**
+ * Block a file descriptor so as it can not receive nor send any
+ * packets.
+ */
+void
+swim_test_transport_block_fd(int fd);
+
+/** Unblock a file descriptor. */
+void
+swim_test_transport_unblock_fd(int fd);
+
/** Initialize test transport system. */
void
swim_test_transport_init(void);
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 20775effb..73f8db40f 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -92,6 +92,12 @@ swim_cluster_node(struct swim_cluster *cluster, int i)
return cluster->node[i];
}
+void
+swim_cluster_block_io(struct swim_cluster *cluster, int i, double delay)
+{
+ swim_test_ev_block_fd(swim_fd(cluster->node[i]), delay);
+}
+
/** Check if @a s1 knows every member of @a s2's table. */
static inline bool
swim1_contains_swim2(struct swim *s1, struct swim *s2)
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 19eea551d..56036422d 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -56,6 +56,10 @@ swim_error_check_match(const char *msg);
struct swim *
swim_cluster_node(struct swim_cluster *cluster, int i);
+/** Block IO on a SWIM instance with id @a i. */
+void
+swim_cluster_block_io(struct swim_cluster *cluster, int i, double delay);
+
/**
* Explicitly add a member of id @a from_id to a member of id
* @a to_id.
>
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
More information about the Tarantool-patches
mailing list