From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 0110E3059D for ; Sat, 1 Jun 2019 20:00:29 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id vbFutdXt9zyb for ; Sat, 1 Jun 2019 20:00:28 -0400 (EDT) Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 7F25630599 for ; Sat, 1 Jun 2019 20:00:28 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 3/5] swim: allow to hang triggers on member updates Date: Sun, 2 Jun 2019 02:00:19 +0200 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org SWIM is asynchronous by design. It means, that there are two ways of getting updates of a member table and individual members: polling and triggers. Polling is terrible - this is why libev with select(), poll(), epoll(), kevent() have appeared. The only acceptable solution is triggers. This commit allows to hang triggers on member updates. Part of #4250 --- src/lib/swim/swim.c | 199 +++++++++++++++++++++++++++++++----- src/lib/swim/swim.h | 47 ++++++++- test/unit/swim.c | 135 +++++++++++++++++++++++- test/unit/swim.result | 28 ++++- test/unit/swim_test_utils.c | 44 +++++++- test/unit/swim_test_utils.h | 12 +++ 6 files changed, 435 insertions(+), 30 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 46b76731d..8507eea14 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -37,6 +37,7 @@ #include "msgpuck.h" #include "assoc.h" #include "sio.h" +#include "trigger.h" #define HEAP_FORWARD_DECLARATION #include "salad/heap.h" @@ -334,6 +335,19 @@ struct swim_member { * time. */ struct rlist in_dissemination_queue; + /** + * Each time a member is updated, or created, or dropped, + * it is added to an update queue. Members from this queue + * are dispatched into user defined triggers. + */ + struct stailq_entry in_update_queue; + /** + * Mask of events happened with this member since a + * previous trigger invocation. Once the events are + * delivered into a trigger, the mask is nullified and + * starts collecting new events. + */ + enum swim_ev_mask events; /** * * Failure detection component @@ -455,6 +469,17 @@ struct swim { * as long as the event TTD is non-zero. */ struct rlist dissemination_queue; + /** + * Queue of updated, new, and dropped members to deliver + * the events to triggers. Dropped members are also kept + * here until they are handled by a trigger. + */ + struct stailq update_queue; + /** + * List of triggers to call on each new, dropped, and + * updated member. + */ + struct rlist on_member_update; /** * Members to which a message should be sent next during * this round. @@ -472,6 +497,13 @@ struct swim { * the beginning of each round. */ struct swim_member **shuffled; + /** + * Worker fiber to serve member update triggers. This task + * is being done in a separate fiber, because user + * triggers can yield and libev callbacks, processing + * member updates, are not allowed to yield. + */ + struct fiber *worker; /** * Single round step task. It is impossible to have * multiple round steps in the same SWIM instance at the @@ -549,10 +581,41 @@ swim_register_event(struct swim *swim, struct swim_member *member) * change of its status, or incarnation, or both. */ static void -swim_on_member_update(struct swim *swim, struct swim_member *member) +swim_on_member_update(struct swim *swim, struct swim_member *member, + enum swim_ev_mask events) { member->unacknowledged_pings = 0; swim_register_event(swim, member); + /* + * Member update should be delivered to triggers only + * if there is at least one trigger. + */ + if (! rlist_empty(&swim->on_member_update)) { + /* + * Member is referenced and added to a queue only + * once. That moment can be detected when a first + * event happens. + */ + if (member->events == 0 && events != 0) { + swim_member_ref(member); + stailq_add_tail_entry(&swim->update_queue, member, + in_update_queue); + fiber_wakeup(swim->worker); + } + member->events |= events; + } +} + +struct rlist * +swim_trigger_list_on_member_update(struct swim *swim) +{ + return &swim->on_member_update; +} + +bool +swim_has_pending_events(struct swim *swim) +{ + return ! stailq_empty(&swim->update_queue); } /** @@ -577,13 +640,17 @@ swim_update_member_inc_status(struct swim *swim, struct swim_member *member, */ assert(member != swim->self); if (member->incarnation < incarnation) { - member->status = new_status; + enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION; + if (new_status != member->status) { + events |= SWIM_EV_NEW_STATUS; + member->status = new_status; + } member->incarnation = incarnation; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, events); } else if (member->incarnation == incarnation && member->status < new_status) { member->status = new_status; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); } } @@ -626,7 +693,7 @@ swim_update_member_payload(struct swim *swim, struct swim_member *member, member->payload_size = payload_size; member->payload_ttd = mh_size(swim->members); member->is_payload_up_to_date = true; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW_PAYLOAD); return 0; } @@ -734,7 +801,6 @@ swim_delete_member(struct swim *swim, struct swim_member *member) mh_int_t rc = mh_swim_table_find(swim->members, key, NULL); assert(rc != mh_end(swim->members)); mh_swim_table_del(swim->members, rc, NULL); - swim_cached_round_msg_invalidate(swim); rlist_del_entry(member, in_round_queue); /* Failure detection component. */ @@ -742,6 +808,7 @@ swim_delete_member(struct swim *swim, struct swim_member *member) wait_ack_heap_delete(&swim->wait_ack_heap, member); /* Dissemination component. */ + swim_on_member_update(swim, member, SWIM_EV_DROP); rlist_del_entry(member, in_dissemination_queue); swim_member_delete(member); @@ -805,7 +872,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, swim_ev_timer_again(swim_loop(), &swim->round_tick); /* Dissemination component. */ - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW); if (payload_size >= 0 && swim_update_member_payload(swim, member, payload, payload_size) != 0) { @@ -1299,14 +1366,15 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events) if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT) break; m->status = MEMBER_SUSPECTED; - swim_on_member_update(swim, m); + swim_on_member_update(swim, m, SWIM_EV_NEW_STATUS); if (swim_send_indirect_pings(swim, m) != 0) diag_log(); break; case MEMBER_SUSPECTED: if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) { m->status = MEMBER_DEAD; - swim_on_member_update(swim, m); + swim_on_member_update(swim, m, + SWIM_EV_NEW_STATUS); } break; case MEMBER_DEAD: @@ -1332,7 +1400,7 @@ swim_update_member_addr(struct swim *swim, struct swim_member *member, { assert(! swim_inaddr_eq(&member->addr, addr)); member->addr = *addr; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW_URI); } /** @@ -1354,12 +1422,10 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, if (def->incarnation > member->incarnation) { if (! swim_inaddr_eq(&def->addr, &member->addr)) swim_update_member_addr(swim, member, &def->addr); - if (def->payload_size >= 0) { + if (def->payload_size >= 0) update_payload = true; - } else if (member->is_payload_up_to_date) { + else if (member->is_payload_up_to_date) member->is_payload_up_to_date = false; - swim_on_member_update(swim, member); - } } else if (! member->is_payload_up_to_date && def->payload_size >= 0) { update_payload = true; } @@ -1430,7 +1496,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, */ if (self->incarnation < def->incarnation) { self->incarnation = def->incarnation; - swim_on_member_update(swim, self); + swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); } if (def->status != MEMBER_ALIVE && def->incarnation == self->incarnation) { @@ -1440,7 +1506,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, * with a bigger incarnation. */ self->incarnation++; - swim_on_member_update(swim, self); + swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); } return 0; skip: @@ -1532,7 +1598,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos, if (def.incarnation == member->incarnation && member->status != MEMBER_ALIVE) { member->status = MEMBER_ALIVE; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); } switch (def.type) { @@ -1604,7 +1670,7 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp); } else if (tmp >= m->incarnation) { m->incarnation = tmp + 1; - swim_on_member_update(swim, m); + swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION); } return 0; } @@ -1668,6 +1734,66 @@ error: diag_log(); } +/** + * Do actual deletion of a SWIM instance. This destructor is + * called only after worker fiber has been stopped, and triggers + * do not work. + */ +static void +swim_do_delete(struct swim *swim); + +/** + * Worker fiber. At this moment its only task is dispatching + * member updates to user defined triggers. Generally, because + * SWIM is fully IO driven, that fiber should be used only for + * yielding tasks not related to SWIM core logic. For all the + * other tasks libev callbacks are ok. Unfortunately, yields are + * not allowed directly in libev callbacks, because they are + * invoked by a cord scheduler fiber prohibited for manual yields. + */ +static int +swim_worker_f(va_list va) +{ + struct swim *s = va_arg(va, struct swim *); + struct swim_on_member_update_ctx ctx; + while (! fiber_is_cancelled()) { + while (stailq_empty(&s->update_queue)) { + fiber_yield(); + if (fiber_is_cancelled()) + goto end; + } + /* + * Can't be empty. SWIM deletes members from + * update queue only on SWIM deletion, but then + * the fiber would be stopped already. + */ + assert(! stailq_empty(&s->update_queue)); + struct swim_member *m = + stailq_shift_entry(&s->update_queue, struct swim_member, + in_update_queue); + /* + * It is possible, that a member was added and + * removed before firing a trigger. It happens, + * if a previous event was being handled too + * long, for example. There is a convention not to + * show such easy riders. + */ + if ((m->events & SWIM_EV_NEW) == 0 || + (m->events & SWIM_EV_DROP) == 0) { + ctx.member = m; + ctx.events = m->events; + m->events = 0; + if (trigger_run(&s->on_member_update, (void *) &ctx)) + diag_log(); + } + swim_member_unref(m); + } +end: + swim_do_delete(s); + return 0; +} + + struct swim * swim_new(void) { @@ -1676,8 +1802,14 @@ swim_new(void) diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim"); return NULL; } + swim->worker = fiber_new("SWIM worker", swim_worker_f); + if (swim->worker == NULL) { + free(swim); + return NULL; + } swim->members = mh_swim_table_new(); if (swim->members == NULL) { + fiber_cancel(swim->worker); free(swim); diag_set(OutOfMemory, sizeof(*swim->members), "mh_swim_table_new", "members"); @@ -1700,7 +1832,10 @@ swim_new(void) /* Dissemination component. */ rlist_create(&swim->dissemination_queue); + rlist_create(&swim->on_member_update); + stailq_create(&swim->update_queue); + fiber_start(swim->worker, swim); return swim; } @@ -1809,6 +1944,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, * specified. */ addr = swim->scheduler.transport.addr; + fiber_set_name(swim->worker, tt_sprintf("SWIM worker %d", + swim_fd(swim))); } else { addr = swim->self->addr; } @@ -1828,7 +1965,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, if (new_self != NULL) { swim->self->status = MEMBER_LEFT; - swim_on_member_update(swim, swim->self); + swim_on_member_update(swim, swim->self, SWIM_EV_NEW_STATUS); swim->self = new_self; } if (! swim_inaddr_eq(&addr, &swim->self->addr)) { @@ -1866,7 +2003,7 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size) if (swim_update_member_payload(swim, self, payload, payload_size) != 0) return -1; self->incarnation++; - swim_on_member_update(swim, self); + swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); return 0; } @@ -1951,17 +2088,19 @@ swim_size(const struct swim *swim) return mh_size(swim->members); } -void -swim_delete(struct swim *swim) +static void +swim_do_delete(struct swim *swim) { struct ev_loop *l = swim_loop(); swim_scheduler_destroy(&swim->scheduler); swim_ev_timer_stop(l, &swim->round_tick); swim_ev_timer_stop(l, &swim->wait_ack_tick); + struct swim_member *m, *tmp; + stailq_foreach_entry_safe(m, tmp, &swim->update_queue, in_update_queue) + swim_member_unref(m); mh_int_t node; mh_foreach(swim->members, node) { - struct swim_member *m = - *mh_swim_table_node(swim->members, node); + m = *mh_swim_table_node(swim->members, node); rlist_del_entry(m, in_round_queue); if (! heap_node_is_stray(&m->in_wait_ack_heap)) wait_ack_heap_delete(&swim->wait_ack_heap, m); @@ -1975,10 +2114,19 @@ swim_delete(struct swim *swim) swim_task_destroy(&swim->round_step_task); wait_ack_heap_destroy(&swim->wait_ack_heap); mh_swim_table_delete(swim->members); + trigger_destroy(&swim->on_member_update); free(swim->shuffled); free(swim); } +void +swim_delete(struct swim *swim) +{ + fiber_wakeup(swim->worker); + fiber_cancel(swim->worker); + fiber_sleep(0); +} + /** * Quit message is broadcasted in the same way as round messages, * step by step, with the only difference that quit round steps @@ -1991,7 +2139,8 @@ swim_quit_step_complete(struct swim_task *task, (void) rc; struct swim *swim = swim_by_scheduler(scheduler); if (rlist_empty(&swim->round_queue)) { - swim_delete(swim); + fiber_wakeup(swim->worker); + fiber_cancel(swim->worker); return; } struct swim_member *m = diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 6db12ba9e..8a5bc9522 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -40,6 +40,7 @@ extern "C" { #endif struct swim; +struct rlist; struct tt_uuid; struct swim_iterator; struct swim_member; @@ -63,7 +64,8 @@ enum swim_gc_mode { /** * Create a new SWIM instance. Do not bind to a port or set any - * parameters. Allocation and initialization only. + * parameters. Allocation and initialization only. The function + * yields. */ struct swim * swim_new(void); @@ -124,7 +126,8 @@ swim_set_codec(struct swim *swim, enum crypto_algo algo, enum crypto_mode mode, /** * Stop listening and broadcasting messages, cleanup all internal - * structures, free memory. + * structures, free memory. The function yields. Actual deletion + * happens after currently working triggers are done. */ void swim_delete(struct swim *swim); @@ -272,6 +275,46 @@ swim_member_unref(struct swim_member *member); bool swim_member_is_dropped(const struct swim_member *member); +enum swim_ev_mask { + SWIM_EV_NEW = 0b00000001, + SWIM_EV_NEW_STATUS = 0b00000010, + SWIM_EV_NEW_URI = 0b00000100, + SWIM_EV_NEW_INCARNATION = 0b00001000, + SWIM_EV_NEW_PAYLOAD = 0b00010000, + /* Shortcut to check for any update. */ + SWIM_EV_UPDATE = 0b00011110, + SWIM_EV_DROP = 0b00100000, +}; + +/** On member update trigger context. */ +struct swim_on_member_update_ctx { + /** New, dropped, or updated member. */ + struct swim_member *member; + /** Mask of happened events. */ + enum swim_ev_mask events; +}; + +/** + * A list of member update triggers. A couple of words about such + * a strange API, instead of something like "add_trigger", + * "drop_trigger". A main motivation is that some functions need + * a whole trigger list. For example, a function adding Lua + * functions as triggers. At the time of this writing it was + * lbox_trigger_reset. There is a convention, that any Tarantool + * trigger exposed to Lua should provide a way to add one, drop + * one, replace one, return all - lbox_trigger_reset does all of + * this. + */ +struct rlist * +swim_trigger_list_on_member_update(struct swim *swim); + +/** + * Check if a SWIM instance has pending update events. Is not a + * public one, used by tests. + */ +bool +swim_has_pending_events(struct swim *swim); + #if defined(__cplusplus) } #endif diff --git a/test/unit/swim.c b/test/unit/swim.c index 2ba9820d8..3f2d156c5 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -41,6 +41,7 @@ #include "swim_test_transport.h" #include "swim_test_ev.h" #include "swim_test_utils.h" +#include "trigger.h" #include /** @@ -950,10 +951,141 @@ swim_test_slow_net(void) swim_finish_test(); } +struct trigger_ctx { + int counter; + bool is_deleted; + bool need_sleep; + struct fiber *f; + struct swim_on_member_update_ctx ctx; +}; + +static void +swim_on_member_update_save_event(struct trigger *t, void *event) +{ + struct trigger_ctx *c = (struct trigger_ctx *) t->data; + ++c->counter; + if (c->ctx.member != NULL) + swim_member_unref(c->ctx.member); + c->ctx = *((struct swim_on_member_update_ctx *) event); + swim_member_ref(c->ctx.member); +} + +static void +swim_on_member_update_yield(struct trigger *t, void *event) +{ + struct trigger_ctx *c = (struct trigger_ctx *) t->data; + ++c->counter; + c->f = fiber(); + while (c->need_sleep) + fiber_yield(); +} + +static void +swim_trigger_destroy_cb(struct trigger *t) +{ + ((struct trigger_ctx *) t->data)->is_deleted = true; +} + +static void +swim_test_triggers(void) +{ + swim_start_test(21); + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_set_ack_timeout(cluster, 1); + struct trigger_ctx tctx, tctx2; + memset(&tctx, 0, sizeof(tctx)); + memset(&tctx2, 0, sizeof(tctx2)); + struct trigger *t1 = (struct trigger *) malloc(sizeof(*t1)); + assert(t1 != NULL); + trigger_create(t1, swim_on_member_update_save_event, (void *) &tctx, + swim_trigger_destroy_cb); + + /* Skip 'new self' events. */ + swim_cluster_run_triggers(cluster); + + struct swim *s1 = swim_cluster_member(cluster, 0); + trigger_add(swim_trigger_list_on_member_update(s1), t1); + swim_cluster_interconnect(cluster, 0, 1); + swim_cluster_run_triggers(cluster); + + is(tctx.counter, 1, "trigger is fired"); + ok(! tctx.is_deleted, "is not deleted"); + is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1), + "ctx.member is set"); + is(tctx.ctx.events, SWIM_EV_NEW, "ctx.events is set"); + + swim_run_for(1); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 2, "payload is delivered, trigger caught that"); + is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1), + "S1 got S2' payload"); + is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD, "mask says that"); + + swim_cluster_member_set_payload(cluster, 0, "123", 3); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 3, "self payload is updated"); + is(tctx.ctx.member, swim_self(s1), "self is set as a member"); + is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_INCARNATION, + "both incarnation and payload events are presented"); + + swim_cluster_set_drop(cluster, 1, 100); + fail_if(swim_cluster_wait_status(cluster, 0, 1, + MEMBER_SUSPECTED, 3) != 0); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 4, "suspicion fired a trigger"); + is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status suspected"); + + fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 3) != 0); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 5, "death fired a trigger"); + is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status dead"); + + fail_if(swim_cluster_wait_status(cluster, 0, 1, + swim_member_status_MAX, 2) != 0); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 6, "drop fired a trigger"); + is(tctx.ctx.events, SWIM_EV_DROP, "status dropped"); + is(swim_cluster_member_view(cluster, 0, 1), NULL, + "dropped member is not presented in the member table"); + isnt(tctx.ctx.member, NULL, "but is in the event context"); + /* + * There is a complication about yields. If a trigger + * yields, other triggers wait for its finish. And all + * the triggers should be ready to SWIM deletion in the + * middle of an update processing. SWIM object should not + * be deleted, until all the triggers are done. + */ + struct trigger *t2 = (struct trigger *) malloc(sizeof(*t2)); + assert(t2 != NULL); + tctx2.need_sleep = true; + trigger_create(t2, swim_on_member_update_yield, (void *) &tctx2, NULL); + trigger_add(swim_trigger_list_on_member_update(s1), t2); + swim_cluster_add_link(cluster, 0, 1); + swim_cluster_run_triggers(cluster); + is(tctx2.counter, 1, "yielding trigger is fired"); + is(tctx.counter, 6, "non-yielding still is not"); + + swim_cluster_delete(cluster); + ok(! tctx.is_deleted, "trigger is not deleted until all currently "\ + "sleeping triggers are finished"); + tctx2.need_sleep = false; + fiber_wakeup(tctx2.f); + while (! tctx.is_deleted) + fiber_sleep(0); + note("now all the triggers are done and deleted"); + + free(t1); + free(t2); + if (tctx.ctx.member != NULL) + swim_member_unref(tctx.ctx.member); + + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(20); + swim_start_test(21); (void) ap; swim_test_ev_init(); @@ -979,6 +1111,7 @@ main_f(va_list ap) swim_test_indirect_ping(); swim_test_encryption(); swim_test_slow_net(); + swim_test_triggers(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 25fdb8833..3ebfe7ea0 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..20 +1..21 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -200,4 +200,30 @@ ok 19 - subtests # slow network leads to idle round steps, they should not produce a new message ok 20 - subtests *** swim_test_slow_net: done *** + *** swim_test_triggers *** + 1..21 + ok 1 - trigger is fired + ok 2 - is not deleted + ok 3 - ctx.member is set + ok 4 - ctx.events is set + ok 5 - payload is delivered, trigger caught that + ok 6 - S1 got S2' payload + ok 7 - mask says that + ok 8 - self payload is updated + ok 9 - self is set as a member + ok 10 - both incarnation and payload events are presented + ok 11 - suspicion fired a trigger + ok 12 - status suspected + ok 13 - death fired a trigger + ok 14 - status dead + ok 15 - drop fired a trigger + ok 16 - status dropped + ok 17 - dropped member is not presented in the member table + ok 18 - but is in the event context + ok 19 - yielding trigger is fired + ok 20 - non-yielding still is not + ok 21 - trigger is not deleted until all currently sleeping triggers are finished + # now all the triggers are done and deleted +ok 21 - subtests + *** swim_test_triggers: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index f72fa2450..0b4743b2d 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -36,6 +36,7 @@ #include "trivia/util.h" #include "fiber.h" #include "msgpuck.h" +#include "trigger.h" /** * Drop rate packet filter to drop packets with a certain @@ -191,12 +192,32 @@ swim_cluster_id_to_uri(char *buffer, int id) sprintf(buffer, "127.0.0.1:%d", id + 1); } +/** + * A trigger to check correctness of event context, and ability + * to yield. + */ +void +swim_test_event_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + struct swim_on_member_update_ctx *ctx = + (struct swim_on_member_update_ctx *) event; + assert(ctx->events != 0); + assert((ctx->events & SWIM_EV_NEW) == 0 || + (ctx->events & SWIM_EV_DROP) == 0); + fiber_sleep(0); +} + /** Create a SWIM cluster node @a n with a 0-based @a id. */ static inline void swim_node_create(struct swim_node *n, int id) { n->swim = swim_new(); assert(n->swim != NULL); + struct trigger *t = (struct trigger *) malloc(sizeof(*t)); + trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free); + trigger_add(swim_trigger_list_on_member_update(n->swim), t); + char uri[128]; swim_cluster_id_to_uri(uri, id); n->uuid = uuid_nil; @@ -303,7 +324,7 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id) swim_member_uri(from), swim_member_uuid(from)); } -static const struct swim_member * +const struct swim_member * swim_cluster_member_view(struct swim_cluster *cluster, int node_id, int member_id) { @@ -615,6 +636,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster, * whatsoever. */ swim_test_transport_do_loop_step(loop); + if (cluster != NULL) + swim_cluster_run_triggers(cluster); while (! check(cluster, data)) { if (swim_time() >= deadline) return -1; @@ -625,6 +648,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster, * too. */ swim_test_transport_do_loop_step(loop); + if (cluster != NULL) + swim_cluster_run_triggers(cluster); } return 0; } @@ -868,6 +893,23 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, swim_loop_check_member_everywhere, &t); } +void +swim_cluster_run_triggers(struct swim_cluster *cluster) +{ + bool has_events; + do { + has_events = false; + struct swim_node *n = cluster->node; + for (int i = 0; i < cluster->size; ++i, ++n) { + if (n->swim != NULL && + swim_has_pending_events(n->swim)) { + has_events = true; + fiber_sleep(0); + } + } + } while (has_events); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 0e6f29d80..fde84e39b 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -168,6 +168,14 @@ int swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, const char *payload, int size); +/** + * Get a member object stored in a SWIM node @a node_id and + * showing a known state of a SWIM node @a member_id. + */ +const struct swim_member * +swim_cluster_member_view(struct swim_cluster *cluster, int node_id, + int member_id); + /** * Check if in the cluster every instance knowns the about other * instances. @@ -229,6 +237,10 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, int member_id, const char *payload, int payload_size, double timeout); +/** Run all pending triggers in the cluster. */ +void +swim_cluster_run_triggers(struct swim_cluster *cluster); + /** Process SWIM events for @a duration fake seconds. */ void swim_run_for(double duration); -- 2.20.1 (Apple Git-117)