From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@freelists.org, Konstantin Osipov <kostja@tarantool.org> Subject: [tarantool-patches] Re: [PATCH 1/1] swim: introduce SWIM's anti-entropy component Date: Thu, 7 Mar 2019 15:33:53 +0300 [thread overview] Message-ID: <7fe095c9-bcab-f5dc-54e4-3ac326d27c02@tarantool.org> (raw) In-Reply-To: <20190307102723.GD5263@chai> On 07/03/2019 13:27, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/07 11:19]: >> Sorry again - ignore the whole email. I've found a bug >> which required drastic changes in the event loop >> implementation. > > Please submit the fix in a follow up patch. I will not review the > entire several thousands of lines once again. As I said, I changed mostly swim_test_transport.c and swim_test_ev.c. Swim.c totally adds 21 lines. It is not about new 1000 lines. But as you wish. I attached a diff below. diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 19d7935b0..4b401800a 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -175,6 +175,14 @@ swim_uuid_str(const struct tt_uuid *uuid) return buf; } +/** Check if two AF_INET addresses are equal. */ +static bool +swim_sockaddr_in_eq(const struct sockaddr_in *a1, const struct sockaddr_in *a2) +{ + return a1->sin_port == a2->sin_port && + a1->sin_addr.s_addr == a2->sin_addr.s_addr; +} + /** * A cluster member description. This structure describes the * last known state of an instance. This state is updated @@ -268,20 +276,18 @@ -/** - * SWIM fd mainly is needed to be printed into the logs in order - * to distinguish between different SWIM instances logs. - */ -static inline int +int swim_fd(const struct swim *swim) { return swim->scheduler.transport.fd; @@ -575,8 +579,17 @@ swim_complete_step(struct swim_task *task, (void) task; struct swim *swim = swim_by_scheduler(scheduler); swim_ev_timer_start(loop(), &swim->round_tick); - rlist_shift_entry(&swim->round_queue, struct swim_member, - in_round_queue); + /* + * It is possible that the original member was deleted + * manually during the task execution. + */ + if (rlist_empty(&swim->round_queue)) + return; + struct swim_member *m = + rlist_first_entry(&swim->round_queue, struct swim_member, + in_round_queue); + if (swim_sockaddr_in_eq(&m->addr, &task->dst)) + rlist_shift(&swim->round_queue); } /** diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 8b82df051..744214371 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -80,6 +80,14 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, void swim_delete(struct swim *swim); +/** + * SWIM fd mainly is needed to be printed into the logs in order + * to distinguish between different SWIM instances logs. And for + * unit testing. + */ +int +swim_fd(const struct swim *swim); + /** Add a new member. */ int swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid); diff --git a/test/unit/swim.c b/test/unit/swim.c index 78a69b7c6..ebb9418d4 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -149,7 +149,7 @@ swim_test_cfg(void) static void swim_test_add_remove(void) { - swim_start_test(12); + swim_start_test(13); struct swim_cluster *cluster = swim_cluster_new(2); swim_cluster_add_link(cluster, 0, 1); @@ -189,6 +189,18 @@ swim_test_add_remove(void) "after removal the cluster is not in fullmesh"); is(swim_cluster_wait_fullmesh(cluster, 1), 0, "but it is back in 1 step"); + + /* + * On each step s1 sends itself to s2. However s2 can be + * removed from s1 after the message is scheduled but + * before its completion. + */ + swim_cluster_block_io(cluster, 0, 2); + swim_do_loop_step(loop()); + swim_remove_member(s1, swim_member_uuid(s2_self)); + is(swim_cluster_wait_fullmesh(cluster, 1), 0, + "back in fullmesh after a member removal in the middle of a step"); + swim_cluster_delete(cluster); swim_finish_test(); @@ -200,7 +212,6 @@ main_f(va_list ap) swim_start_test(5); (void) ap; - struct ev_loop *loop = loop(); swim_test_ev_init(); swim_test_transport_init(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 73e1504a3..e71d6cfc2 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -38,7 +38,7 @@ ok 3 - subtests ok 4 - subtests *** swim_test_cfg: done *** *** swim_test_add_remove *** - 1..12 + 1..13 ok 1 - can not add an existing member ok 2 - diag says 'already exists' ok 3 - can not add a invalid uri @@ -51,6 +51,7 @@ ok 4 - subtests ok 10 - remove of a not existing member ok 11 - after removal the cluster is not in fullmesh ok 12 - but it is back in 1 step + ok 13 - back in fullmesh after a member removal in the middle of a step ok 5 - subtests *** swim_test_add_remove: done *** *** main_f: done *** diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c index b70a8b9c8..950784aec 100644 --- a/test/unit/swim_test_ev.c +++ b/test/unit/swim_test_ev.c @@ -49,31 +49,42 @@ static double watch = 0; static int event_id = 0; /** - * Timed event's description and attributes used to process it. - * Note, that IO events are not wrapped with that and processed - * separately in test transport module. + * SWIM testing event loop has two event types - natural libev + * events like timer, and artificial like fake socket blocking. */ -struct swim_test_event { - /** - * Event mask with which libev watcher's callback should - * be invoked on that event. - */ - int revents; - /** - * Libev watcher. Used to store callback and to find the - * event by watcher pointer. It is necessary because SWIM - * operates by libev watchers. - */ - struct ev_watcher *watcher; +enum swim_event_type { + SWIM_EVENT_TIMER, + SWIM_EVENT_FD_UNBLOCK, +}; + +struct swim_event; + +typedef void (*swim_event_process_f)(struct swim_event *, struct ev_loop *); +typedef void (*swim_event_delete_f)(struct swim_event *); + +/** + * Base event. It is stored in the event heap and virtualizes + * other events. + */ +struct swim_event { + /** Type, for assertions only. */ + enum swim_event_type type; /** * When that event should be invoked according to the fake * watch. */ double deadline; - /** A link in the events heap. */ - struct heap_node in_events_heap; + /** A link in the event heap. */ + struct heap_node in_event_heap; /** ID to sort events with the same deadline. */ int id; + /** + * Process the event. Usually the event is deleted right + * after that. + */ + swim_event_process_f process; + /** Just delete the event. Called on event heap reset. */ + swim_event_delete_f delete; }; /** @@ -81,68 +92,162 @@ struct swim_test_event { * deadline and the smallest ID in that deadline. */ static inline bool -swim_test_event_less(const struct swim_test_event *e1, - const struct swim_test_event *e2) +swim_event_less(const struct swim_event *e1, const struct swim_event *e2) { if (e1->deadline == e2->deadline) return e1->id < e2->id; return e1->deadline < e2->deadline; } -#define HEAP_NAME events_heap -#define HEAP_LESS(h, e1, e2) swim_test_event_less(e1, e2) -#define heap_value_t struct swim_test_event -#define heap_value_attr in_events_heap +#define HEAP_NAME event_heap +#define HEAP_LESS(h, e1, e2) swim_event_less(e1, e2) +#define heap_value_t struct swim_event +#define heap_value_attr in_event_heap #include "salad/heap.h" -/** Events heap. Event loop pops them from here. */ -static heap_t events_heap; +/** Event heap. Event loop pops them from here. */ +static heap_t event_heap; -/** Libev watcher is matched to exactly one test event here. */ +/** Libev watcher is matched to exactly one event here. */ static struct mh_i64ptr_t *events_hash; /** - * Create a new event which should happen after @a delay and with - * the specified events mask on behalf of @a watcher. + * Create a new event which should call @a process after @a delay + * fake seconds. @A delete is called explicitly when the event + * is deleted by SWIM explicitly, and when the event heap is + * reset. */ static void -swim_test_event_new(struct ev_watcher *watcher, double delay, int revents) +swim_event_create(struct swim_event *e, enum swim_event_type type, double delay, + swim_event_process_f process, swim_event_delete_f delete) { - struct swim_test_event *e = - (struct swim_test_event *) malloc(sizeof(*e)); - assert(e != NULL); - e->watcher = watcher; e->deadline = swim_time() + delay; - e->revents = revents; e->id = event_id++; - events_heap_insert(&events_heap, e); - struct mh_i64ptr_node_t old = {0, NULL}, *old_p = &old; - struct mh_i64ptr_node_t node = {(uint64_t) watcher, e}; - mh_int_t rc = mh_i64ptr_put(events_hash, &node, &old_p, NULL); - (void) rc; - assert(rc != mh_end(events_hash)); - assert(old.val == NULL && old.key == 0); + e->process = process; + e->delete = delete; + e->type = type; + event_heap_insert(&event_heap, e); } -/** Delete event, free resources. */ +/** Destroy a basic event. */ static inline void -swim_test_event_delete(struct swim_test_event *e) +swim_event_destroy(struct swim_event *e) { - events_heap_delete(&events_heap, e); - mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) e->watcher, NULL); - assert(rc != mh_end(events_hash)); - mh_i64ptr_del(events_hash, rc, NULL); - free(e); + event_heap_delete(&event_heap, e); +} + +/** Destroy a event and free its resources. */ +static inline void +swim_event_delete(struct swim_event *e) +{ + e->delete(e); } /** Find an event by @a watcher. */ -static struct swim_test_event * -swim_test_event_by_ev(struct ev_watcher *watcher) +static struct swim_event * +swim_event_by_ev(struct ev_watcher *watcher) { mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) watcher, NULL); if (rc == mh_end(events_hash)) return NULL; - return (struct swim_test_event *) mh_i64ptr_node(events_hash, rc)->val; + return (struct swim_event *) mh_i64ptr_node(events_hash, rc)->val; +} + +/** Timer event generated by libev. */ +struct swim_timer_event { + struct swim_event base; + /** + * Libev watcher. Used to store callback and to find the + * event by watcher pointer. It is necessary because SWIM + * operates by libev watchers. + */ + struct ev_watcher *watcher; +}; + +/** Destroy a timer event and free its resources. */ +static void +swim_timer_event_delete(struct swim_event *e) +{ + assert(e->type == SWIM_EVENT_TIMER); + struct swim_timer_event *te = (struct swim_timer_event *) e; + mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) te->watcher, NULL); + assert(rc != mh_end(events_hash)); + mh_i64ptr_del(events_hash, rc, NULL); + swim_event_destroy(e); + free(te); +} + +/** Process a timer event and delete it. */ +static void +swim_timer_event_process(struct swim_event *e, struct ev_loop *loop) +{ + assert(e->type == SWIM_EVENT_TIMER); + struct ev_watcher *w = ((struct swim_timer_event *) e)->watcher; + swim_timer_event_delete(e); + ev_invoke(loop, w, EV_TIMER); +} + +/** Create a new timer event. */ +static void +swim_timer_event_new(struct ev_watcher *watcher, double delay) +{ + struct swim_timer_event *e = + (struct swim_timer_event *) malloc(sizeof(*e)); + assert(e != NULL); + swim_event_create(&e->base, SWIM_EVENT_TIMER, delay, + swim_timer_event_process, swim_timer_event_delete); + e->watcher = watcher; + assert(swim_event_by_ev(watcher) == NULL); + struct mh_i64ptr_node_t node = {(uint64_t) watcher, e}; + mh_int_t rc = mh_i64ptr_put(events_hash, &node, NULL, NULL); + (void) rc; + assert(rc != mh_end(events_hash)); +} + +/** + * SWIM fake transport's event. It is used to block a fake file + * descriptor for a delay. Right after a block that event is + * generated to unblock the descriptor later. + */ +struct swim_fd_unblock_event { + struct swim_event base; + /** A fake file descriptor to unlock. */ + int fd; +}; + +/** Delete a fd unblock event. */ +static void +swim_fd_unblock_event_delete(struct swim_event *e) +{ + assert(e->type == SWIM_EVENT_FD_UNBLOCK); + swim_event_destroy(e); + free(e); +} + +/** Process and delete a fd unblock event. */ +static void +swim_fd_unblock_event_process(struct swim_event *e, struct ev_loop *loop) +{ + (void) loop; + assert(e->type == SWIM_EVENT_FD_UNBLOCK); + struct swim_fd_unblock_event *fe = (struct swim_fd_unblock_event *) e; + swim_test_transport_unblock_fd(fe->fd); + swim_fd_unblock_event_delete(e); +} + +void +swim_test_ev_block_fd(int fd, double delay) +{ + struct swim_fd_unblock_event *e = + (struct swim_fd_unblock_event *) malloc(sizeof(*e)); + assert(e != NULL); + /* Block now. */ + swim_test_transport_block_fd(fd); + /* Unblock after delay. */ + swim_event_create(&e->base, SWIM_EVENT_FD_UNBLOCK, delay, + swim_fd_unblock_event_process, + swim_fd_unblock_event_delete); + e->fd = fd; } /** Implementation of global time visible in SWIM. */ @@ -159,24 +264,25 @@ swim_time(void) void swim_ev_timer_start(struct ev_loop *loop, struct ev_timer *base) { - if (swim_test_event_by_ev((struct ev_watcher *) base) != NULL) + (void) loop; + if (swim_event_by_ev((struct ev_watcher *) base) != NULL) return; /* Create the periodic watcher and one event. */ - swim_test_event_new((struct ev_watcher *) base, base->at, EV_TIMER); + swim_timer_event_new((struct ev_watcher *) base, base->at); } /** Time stop cancels the event if the timer is active. */ void swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *base) { + (void) loop; /* * Delete the watcher and its events. Should be only one. */ - struct swim_test_event *e = - swim_test_event_by_ev((struct ev_watcher *) base); + struct swim_event *e = swim_event_by_ev((struct ev_watcher *) base); if (e == NULL) return; - swim_test_event_delete(e); + swim_event_delete(e); } /** Process all the events with the next nearest deadline. */ @@ -184,17 +290,16 @@ void swim_do_loop_step(struct ev_loop *loop) { say_verbose("Loop watch %f", watch); - struct swim_test_event *e = events_heap_top(&events_heap); + struct swim_event *next_e, *e = event_heap_top(&event_heap); if (e != NULL) { assert(e->deadline >= watch); /* Multiple events can have the same deadline. */ watch = e->deadline; do { - int revents = e->revents; - struct ev_watcher *w = e->watcher; - swim_test_event_delete(e); - ev_invoke(loop, w, revents); - e = events_heap_top(&events_heap); + e->process(e, loop); + next_e = event_heap_top(&event_heap); + assert(e != next_e); + e = next_e; } while (e != NULL && e->deadline == watch); } /* @@ -227,9 +332,9 @@ swim_do_loop_step(struct ev_loop *loop) void swim_test_ev_reset(void) { - struct swim_test_event *e; - while ((e = events_heap_top(&events_heap)) != NULL) - swim_test_event_delete(e); + struct swim_event *e; + while ((e = event_heap_top(&event_heap)) != NULL) + swim_event_delete(e); assert(mh_size(events_hash) == 0); event_id = 0; watch = 0; @@ -240,13 +345,13 @@ swim_test_ev_init(void) { events_hash = mh_i64ptr_new(); assert(events_hash != NULL); - events_heap_create(&events_heap); + event_heap_create(&event_heap); } void swim_test_ev_free(void) { swim_test_ev_reset(); - events_heap_destroy(&events_heap); + event_heap_destroy(&event_heap); mh_i64ptr_delete(events_hash); } diff --git a/test/unit/swim_test_ev.h b/test/unit/swim_test_ev.h index 788ac06e9..808bc510e 100644 --- a/test/unit/swim_test_ev.h +++ b/test/unit/swim_test_ev.h @@ -47,6 +47,10 @@ swim_test_ev_init(void); void swim_test_ev_free(void); +/** Block a file descriptor @a fd for @a delay fake seconds. */ +void +swim_test_ev_block_fd(int fd, double delay); + /** Play one step of event loop, process generated events. */ void swim_do_loop_step(struct ev_loop *loop); diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index d49b92885..ee50e3922 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -225,6 +225,22 @@ swim_transport_create(struct swim_transport *transport) memset(&transport->addr, 0, sizeof(transport->addr)); } +void +swim_test_transport_block_fd(int fd) +{ + struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; + assert(! rlist_empty(&sfd->in_opened)); + rlist_del_entry(sfd, in_opened); +} + +void +swim_test_transport_unblock_fd(int fd) +{ + struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; + assert(rlist_empty(&sfd->in_opened)); + rlist_add_tail_entry(&swim_fd_opened, sfd, in_opened); +} + /** Send one packet to destination's recv queue. */ static inline void swim_fd_send_packet(struct swim_fd *fd) diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h index c9739e250..f8066e636 100644 --- a/test/unit/swim_test_transport.h +++ b/test/unit/swim_test_transport.h @@ -47,6 +47,17 @@ struct ev_loop; void swim_transport_do_loop_step(struct ev_loop *loop); +/** + * Block a file descriptor so as it can not receive nor send any + * packets. + */ +void +swim_test_transport_block_fd(int fd); + +/** Unblock a file descriptor. */ +void +swim_test_transport_unblock_fd(int fd); + /** Initialize test transport system. */ void swim_test_transport_init(void); diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 20775effb..73f8db40f 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -92,6 +92,12 @@ swim_cluster_node(struct swim_cluster *cluster, int i) return cluster->node[i]; } +void +swim_cluster_block_io(struct swim_cluster *cluster, int i, double delay) +{ + swim_test_ev_block_fd(swim_fd(cluster->node[i]), delay); +} + /** Check if @a s1 knows every member of @a s2's table. */ static inline bool swim1_contains_swim2(struct swim *s1, struct swim *s2) diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 19eea551d..56036422d 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -56,6 +56,10 @@ swim_error_check_match(const char *msg); struct swim * swim_cluster_node(struct swim_cluster *cluster, int i); +/** Block IO on a SWIM instance with id @a i. */ +void +swim_cluster_block_io(struct swim_cluster *cluster, int i, double delay); + /** * Explicitly add a member of id @a from_id to a member of id * @a to_id. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov >
next prev parent reply other threads:[~2019-03-07 12:33 UTC|newest] Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-03-06 15:13 [tarantool-patches] " Vladislav Shpilevoy 2019-03-06 15:16 ` [tarantool-patches] " Vladislav Shpilevoy 2019-03-06 21:24 ` Vladislav Shpilevoy 2019-03-07 10:27 ` Konstantin Osipov 2019-03-07 12:33 ` Vladislav Shpilevoy [this message] 2019-03-07 13:20 ` Konstantin Osipov 2019-03-07 13:54 ` Vladislav Shpilevoy 2019-03-07 10:22 ` Konstantin Osipov 2019-03-07 12:33 ` Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=7fe095c9-bcab-f5dc-54e4-3ac326d27c02@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='[tarantool-patches] Re: [PATCH 1/1] swim: introduce SWIM'\''s anti-entropy component' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox