From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 5AB802FC0A for ; Sat, 8 Jun 2019 06:31:23 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id XXH_MknZxF9D for ; Sat, 8 Jun 2019 06:31:23 -0400 (EDT) Received: from smtp57.i.mail.ru (smtp57.i.mail.ru [217.69.128.37]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id E09962FBCB for ; Sat, 8 Jun 2019 06:31:22 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v2 3/5] swim: allow to set triggers on member updates Date: Sat, 8 Jun 2019 12:31:19 +0200 Message-Id: <5db4b05b0f5708c4bbc9ac0f39bc12c2cdad43bd.1559989748.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org SWIM is asynchronous by design. It means, that there are two ways of getting updates of a member table and individual members: polling and triggers. Polling is terrible - this is why libev with select(), poll(), epoll(), kevent() have appeared. The only acceptable solution is triggers. This commit allows to set triggers on member updates. Part of #4250 --- src/lib/swim/swim.c | 211 ++++++++++++++++++++++++++++++++---- src/lib/swim/swim.h | 47 +++++++- test/unit/swim.c | 146 ++++++++++++++++++++++++- test/unit/swim.result | 28 ++++- test/unit/swim_test_utils.c | 44 +++++++- test/unit/swim_test_utils.h | 12 ++ 6 files changed, 460 insertions(+), 28 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 46b76731d..00234d1df 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -37,6 +37,7 @@ #include "msgpuck.h" #include "assoc.h" #include "sio.h" +#include "trigger.h" #define HEAP_FORWARD_DECLARATION #include "salad/heap.h" @@ -334,6 +335,19 @@ struct swim_member { * time. */ struct rlist in_dissemination_queue; + /** + * Each time a member is updated, or created, or dropped, + * it is added to an event queue. Members from this queue + * are dispatched into user defined triggers. + */ + struct stailq_entry in_event_queue; + /** + * Mask of events happened with this member since a + * previous trigger invocation. Once the events are + * delivered into a trigger, the mask is nullified and + * starts collecting new events. + */ + enum swim_ev_mask events; /** * * Failure detection component @@ -455,6 +469,17 @@ struct swim { * as long as the event TTD is non-zero. */ struct rlist dissemination_queue; + /** + * Queue of updated, new, and dropped members to deliver + * the events to triggers. Dropped members are also kept + * here until they are handled by a trigger. + */ + struct stailq event_queue; + /** + * List of triggers to call on each new, dropped, and + * updated member. + */ + struct rlist on_member_event; /** * Members to which a message should be sent next during * this round. @@ -472,6 +497,13 @@ struct swim { * the beginning of each round. */ struct swim_member **shuffled; + /** + * Fiber to serve member event triggers. This task is + * being done in a separate fiber, because user triggers + * can yield and libev callbacks, processing member + * events, are not allowed to yield. + */ + struct fiber *event_handler; /** * Single round step task. It is impossible to have * multiple round steps in the same SWIM instance at the @@ -549,10 +581,41 @@ swim_register_event(struct swim *swim, struct swim_member *member) * change of its status, or incarnation, or both. */ static void -swim_on_member_update(struct swim *swim, struct swim_member *member) +swim_on_member_update(struct swim *swim, struct swim_member *member, + enum swim_ev_mask events) { member->unacknowledged_pings = 0; swim_register_event(swim, member); + /* + * Member event should be delivered to triggers only if + * there is at least one trigger. + */ + if (! rlist_empty(&swim->on_member_event)) { + /* + * Member is referenced and added to a queue only + * once. That moment can be detected when a first + * event happens. + */ + if (member->events == 0 && events != 0) { + swim_member_ref(member); + stailq_add_tail_entry(&swim->event_queue, member, + in_event_queue); + fiber_wakeup(swim->event_handler); + } + member->events |= events; + } +} + +struct rlist * +swim_trigger_list_on_member_event(struct swim *swim) +{ + return &swim->on_member_event; +} + +bool +swim_has_pending_events(struct swim *swim) +{ + return ! stailq_empty(&swim->event_queue); } /** @@ -577,13 +640,17 @@ swim_update_member_inc_status(struct swim *swim, struct swim_member *member, */ assert(member != swim->self); if (member->incarnation < incarnation) { - member->status = new_status; + enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION; + if (new_status != member->status) { + events |= SWIM_EV_NEW_STATUS; + member->status = new_status; + } member->incarnation = incarnation; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, events); } else if (member->incarnation == incarnation && member->status < new_status) { member->status = new_status; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); } } @@ -626,7 +693,7 @@ swim_update_member_payload(struct swim *swim, struct swim_member *member, member->payload_size = payload_size; member->payload_ttd = mh_size(swim->members); member->is_payload_up_to_date = true; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW_PAYLOAD); return 0; } @@ -734,7 +801,6 @@ swim_delete_member(struct swim *swim, struct swim_member *member) mh_int_t rc = mh_swim_table_find(swim->members, key, NULL); assert(rc != mh_end(swim->members)); mh_swim_table_del(swim->members, rc, NULL); - swim_cached_round_msg_invalidate(swim); rlist_del_entry(member, in_round_queue); /* Failure detection component. */ @@ -742,6 +808,7 @@ swim_delete_member(struct swim *swim, struct swim_member *member) wait_ack_heap_delete(&swim->wait_ack_heap, member); /* Dissemination component. */ + swim_on_member_update(swim, member, SWIM_EV_DROP); rlist_del_entry(member, in_dissemination_queue); swim_member_delete(member); @@ -805,7 +872,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, swim_ev_timer_again(swim_loop(), &swim->round_tick); /* Dissemination component. */ - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW); if (payload_size >= 0 && swim_update_member_payload(swim, member, payload, payload_size) != 0) { @@ -1299,14 +1366,15 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events) if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT) break; m->status = MEMBER_SUSPECTED; - swim_on_member_update(swim, m); + swim_on_member_update(swim, m, SWIM_EV_NEW_STATUS); if (swim_send_indirect_pings(swim, m) != 0) diag_log(); break; case MEMBER_SUSPECTED: if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) { m->status = MEMBER_DEAD; - swim_on_member_update(swim, m); + swim_on_member_update(swim, m, + SWIM_EV_NEW_STATUS); } break; case MEMBER_DEAD: @@ -1332,7 +1400,7 @@ swim_update_member_addr(struct swim *swim, struct swim_member *member, { assert(! swim_inaddr_eq(&member->addr, addr)); member->addr = *addr; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW_URI); } /** @@ -1354,12 +1422,10 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, if (def->incarnation > member->incarnation) { if (! swim_inaddr_eq(&def->addr, &member->addr)) swim_update_member_addr(swim, member, &def->addr); - if (def->payload_size >= 0) { + if (def->payload_size >= 0) update_payload = true; - } else if (member->is_payload_up_to_date) { + else if (member->is_payload_up_to_date) member->is_payload_up_to_date = false; - swim_on_member_update(swim, member); - } } else if (! member->is_payload_up_to_date && def->payload_size >= 0) { update_payload = true; } @@ -1430,7 +1496,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, */ if (self->incarnation < def->incarnation) { self->incarnation = def->incarnation; - swim_on_member_update(swim, self); + swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); } if (def->status != MEMBER_ALIVE && def->incarnation == self->incarnation) { @@ -1440,7 +1506,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, * with a bigger incarnation. */ self->incarnation++; - swim_on_member_update(swim, self); + swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); } return 0; skip: @@ -1532,7 +1598,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos, if (def.incarnation == member->incarnation && member->status != MEMBER_ALIVE) { member->status = MEMBER_ALIVE; - swim_on_member_update(swim, member); + swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); } switch (def.type) { @@ -1604,7 +1670,7 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp); } else if (tmp >= m->incarnation) { m->incarnation = tmp + 1; - swim_on_member_update(swim, m); + swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION); } return 0; } @@ -1668,6 +1734,55 @@ error: diag_log(); } +/** + * Event handler. At this moment its only task is dispatching + * member events to user defined triggers. Generally, because + * SWIM is fully IO driven, that fiber should be used only for + * yielding tasks not related to SWIM core logic. For all the + * other tasks libev callbacks are ok. Unfortunately, yields are + * not allowed directly in libev callbacks, because they are + * invoked by a cord scheduler fiber prohibited for manual yields. + */ +static int +swim_event_handler_f(va_list va) +{ + struct swim *s = va_arg(va, struct swim *); + struct swim_on_member_event_ctx ctx; + while (! fiber_is_cancelled()) { + if (stailq_empty(&s->event_queue)) { + fiber_yield(); + continue; + } + /* + * Can't be empty. SWIM deletes members from + * event queue only on SWIM deletion, but then + * the fiber would be stopped already. + */ + assert(! stailq_empty(&s->event_queue)); + struct swim_member *m = + stailq_shift_entry(&s->event_queue, struct swim_member, + in_event_queue); + /* + * It is possible, that a member was added and + * removed before firing a trigger. It happens, + * if a previous event was being handled too + * long, for example. There is a convention not to + * show such easy riders. + */ + if ((m->events & SWIM_EV_NEW) == 0 || + (m->events & SWIM_EV_DROP) == 0) { + ctx.member = m; + ctx.events = m->events; + m->events = 0; + if (trigger_run(&s->on_member_event, (void *) &ctx)) + diag_log(); + } + swim_member_unref(m); + } + return 0; +} + + struct swim * swim_new(void) { @@ -1700,7 +1815,16 @@ swim_new(void) /* Dissemination component. */ rlist_create(&swim->dissemination_queue); - + rlist_create(&swim->on_member_event); + stailq_create(&swim->event_queue); + swim->event_handler = fiber_new("SWIM event handler", + swim_event_handler_f); + if (swim->event_handler == NULL) { + swim_delete(swim); + return NULL; + } + fiber_set_joinable(swim->event_handler, true); + fiber_start(swim->event_handler, swim); return swim; } @@ -1809,6 +1933,9 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, * specified. */ addr = swim->scheduler.transport.addr; + fiber_set_name(swim->event_handler, + tt_sprintf("SWIM event handler %d", + swim_fd(swim))); } else { addr = swim->self->addr; } @@ -1828,7 +1955,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, if (new_self != NULL) { swim->self->status = MEMBER_LEFT; - swim_on_member_update(swim, swim->self); + swim_on_member_update(swim, swim->self, SWIM_EV_NEW_STATUS); swim->self = new_self; } if (! swim_inaddr_eq(&addr, &swim->self->addr)) { @@ -1866,7 +1993,7 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size) if (swim_update_member_payload(swim, self, payload, payload_size) != 0) return -1; self->incarnation++; - swim_on_member_update(swim, self); + swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); return 0; } @@ -1951,17 +2078,44 @@ swim_size(const struct swim *swim) return mh_size(swim->members); } +/** + * Cancel and wait finish of an event handler fiber. That + * operation is not inlined in the SWIM destructor, because there + * is one more place, when the handler should be stopped even + * before SWIM deletion - quit. Quit deletes the instance only + * when all the 'I left' messages are sent, but it happens in a + * libev callback in the scheduler fiber where it is impossible + * to yield. So to that moment the handler should be dead already. + */ +static inline void +swim_kill_event_handler(struct swim *swim) +{ + struct fiber *f = swim->event_handler; + /* + * Nullify so as not to keep pointer at a fiber when it is + * reused. + */ + swim->event_handler = NULL; + fiber_wakeup(f); + fiber_cancel(f); + fiber_join(f); +} + void swim_delete(struct swim *swim) { + if (swim->event_handler != NULL) + swim_kill_event_handler(swim); struct ev_loop *l = swim_loop(); swim_scheduler_destroy(&swim->scheduler); swim_ev_timer_stop(l, &swim->round_tick); swim_ev_timer_stop(l, &swim->wait_ack_tick); + struct swim_member *m, *tmp; + stailq_foreach_entry_safe(m, tmp, &swim->event_queue, in_event_queue) + swim_member_unref(m); mh_int_t node; mh_foreach(swim->members, node) { - struct swim_member *m = - *mh_swim_table_node(swim->members, node); + m = *mh_swim_table_node(swim->members, node); rlist_del_entry(m, in_round_queue); if (! heap_node_is_stray(&m->in_wait_ack_heap)) wait_ack_heap_delete(&swim->wait_ack_heap, m); @@ -1975,6 +2129,7 @@ swim_delete(struct swim *swim) swim_task_destroy(&swim->round_step_task); wait_ack_heap_destroy(&swim->wait_ack_heap); mh_swim_table_delete(swim->members); + trigger_destroy(&swim->on_member_event); free(swim->shuffled); free(swim); } @@ -1991,6 +2146,11 @@ swim_quit_step_complete(struct swim_task *task, (void) rc; struct swim *swim = swim_by_scheduler(scheduler); if (rlist_empty(&swim->round_queue)) { + /* + * The handler should be dead - can't yield here, + * it is the scheduler fiber. + */ + assert(swim->event_handler == NULL); swim_delete(swim); return; } @@ -2020,6 +2180,11 @@ void swim_quit(struct swim *swim) { assert(swim_is_configured(swim)); + /* + * Kill the handler now. Later it will be impossible to do + * from the scheduler fiber. + */ + swim_kill_event_handler(swim); struct ev_loop *l = swim_loop(); swim_ev_timer_stop(l, &swim->round_tick); swim_ev_timer_stop(l, &swim->wait_ack_tick); diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 6db12ba9e..a42ace7c6 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -40,6 +40,7 @@ extern "C" { #endif struct swim; +struct rlist; struct tt_uuid; struct swim_iterator; struct swim_member; @@ -63,7 +64,8 @@ enum swim_gc_mode { /** * Create a new SWIM instance. Do not bind to a port or set any - * parameters. Allocation and initialization only. + * parameters. Allocation and initialization only. The function + * yields. */ struct swim * swim_new(void); @@ -124,7 +126,8 @@ swim_set_codec(struct swim *swim, enum crypto_algo algo, enum crypto_mode mode, /** * Stop listening and broadcasting messages, cleanup all internal - * structures, free memory. + * structures, free memory. The function yields. Actual deletion + * happens after currently working triggers are done. */ void swim_delete(struct swim *swim); @@ -272,6 +275,46 @@ swim_member_unref(struct swim_member *member); bool swim_member_is_dropped(const struct swim_member *member); +enum swim_ev_mask { + SWIM_EV_NEW = 0b00000001, + SWIM_EV_NEW_STATUS = 0b00000010, + SWIM_EV_NEW_URI = 0b00000100, + SWIM_EV_NEW_INCARNATION = 0b00001000, + SWIM_EV_NEW_PAYLOAD = 0b00010000, + /* Shortcut to check for any update. */ + SWIM_EV_UPDATE = 0b00011110, + SWIM_EV_DROP = 0b00100000, +}; + +/** On member event trigger context. */ +struct swim_on_member_event_ctx { + /** New, dropped, or updated member. */ + struct swim_member *member; + /** Mask of happened events. */ + enum swim_ev_mask events; +}; + +/** + * A list of member event processing triggers. A couple of words + * about such a strange API, instead of something like + * "add_trigger", "drop_trigger". A main motivation is that some + * functions need a whole trigger list. For example, a function + * adding Lua functions as triggers. At the time of this writing + * it was lbox_trigger_reset. There is a convention, that any + * Tarantool trigger exposed to Lua should provide a way to add + * one, drop one, replace one, return all - lbox_trigger_reset + * does all of this. + */ +struct rlist * +swim_trigger_list_on_member_event(struct swim *swim); + +/** + * Check if a SWIM instance has pending events. Is not a public + * one, used by tests. + */ +bool +swim_has_pending_events(struct swim *swim); + #if defined(__cplusplus) } #endif diff --git a/test/unit/swim.c b/test/unit/swim.c index 2ba9820d8..0e33d691c 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -41,6 +41,7 @@ #include "swim_test_transport.h" #include "swim_test_ev.h" #include "swim_test_utils.h" +#include "trigger.h" #include /** @@ -950,10 +951,152 @@ swim_test_slow_net(void) swim_finish_test(); } +struct trigger_ctx { + int counter; + bool is_deleted; + bool need_sleep; + struct fiber *f; + struct swim_on_member_event_ctx ctx; +}; + +static void +swim_on_member_event_save(struct trigger *t, void *event) +{ + struct trigger_ctx *c = (struct trigger_ctx *) t->data; + ++c->counter; + if (c->ctx.member != NULL) + swim_member_unref(c->ctx.member); + c->ctx = *((struct swim_on_member_event_ctx *) event); + swim_member_ref(c->ctx.member); +} + +static void +swim_on_member_event_yield(struct trigger *t, void *event) +{ + struct trigger_ctx *c = (struct trigger_ctx *) t->data; + ++c->counter; + c->f = fiber(); + while (c->need_sleep) + fiber_yield(); +} + +static void +swim_trigger_destroy_cb(struct trigger *t) +{ + ((struct trigger_ctx *) t->data)->is_deleted = true; +} + +static int +swim_cluster_delete_f(va_list ap) +{ + struct swim_cluster *c = (struct swim_cluster *) + va_arg(ap, struct swim_cluster *); + swim_cluster_delete(c); + return 0; +} + +static void +swim_test_triggers(void) +{ + swim_start_test(21); + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_set_ack_timeout(cluster, 1); + struct trigger_ctx tctx, tctx2; + memset(&tctx, 0, sizeof(tctx)); + memset(&tctx2, 0, sizeof(tctx2)); + struct trigger *t1 = (struct trigger *) malloc(sizeof(*t1)); + assert(t1 != NULL); + trigger_create(t1, swim_on_member_event_save, (void *) &tctx, + swim_trigger_destroy_cb); + + /* Skip 'new self' events. */ + swim_cluster_run_triggers(cluster); + + struct swim *s1 = swim_cluster_member(cluster, 0); + trigger_add(swim_trigger_list_on_member_event(s1), t1); + swim_cluster_interconnect(cluster, 0, 1); + swim_cluster_run_triggers(cluster); + + is(tctx.counter, 1, "trigger is fired"); + ok(! tctx.is_deleted, "is not deleted"); + is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1), + "ctx.member is set"); + is(tctx.ctx.events, SWIM_EV_NEW, "ctx.events is set"); + + swim_run_for(1); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 2, "payload is delivered, trigger caught that"); + is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1), + "S1 got S2' payload"); + is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD, "mask says that"); + + swim_cluster_member_set_payload(cluster, 0, "123", 3); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 3, "self payload is updated"); + is(tctx.ctx.member, swim_self(s1), "self is set as a member"); + is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_INCARNATION, + "both incarnation and payload events are presented"); + + swim_cluster_set_drop(cluster, 1, 100); + fail_if(swim_cluster_wait_status(cluster, 0, 1, + MEMBER_SUSPECTED, 3) != 0); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 4, "suspicion fired a trigger"); + is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status suspected"); + + fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 3) != 0); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 5, "death fired a trigger"); + is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status dead"); + + fail_if(swim_cluster_wait_status(cluster, 0, 1, + swim_member_status_MAX, 2) != 0); + swim_cluster_run_triggers(cluster); + is(tctx.counter, 6, "drop fired a trigger"); + is(tctx.ctx.events, SWIM_EV_DROP, "status dropped"); + is(swim_cluster_member_view(cluster, 0, 1), NULL, + "dropped member is not presented in the member table"); + isnt(tctx.ctx.member, NULL, "but is in the event context"); + /* + * There is a complication about yields. If a trigger + * yields, other triggers wait for its finish. And all + * the triggers should be ready to SWIM deletion in the + * middle of an event processing. SWIM object should not + * be deleted, until all the triggers are done. + */ + struct trigger *t2 = (struct trigger *) malloc(sizeof(*t2)); + assert(t2 != NULL); + tctx2.need_sleep = true; + trigger_create(t2, swim_on_member_event_yield, (void *) &tctx2, NULL); + trigger_add(swim_trigger_list_on_member_event(s1), t2); + swim_cluster_add_link(cluster, 0, 1); + swim_cluster_run_triggers(cluster); + is(tctx2.counter, 1, "yielding trigger is fired"); + is(tctx.counter, 6, "non-yielding still is not"); + + struct fiber *async_delete_fiber = + fiber_new("async delete", swim_cluster_delete_f); + fiber_start(async_delete_fiber, cluster); + ok(! tctx.is_deleted, "trigger is not deleted until all currently "\ + "sleeping triggers are finished"); + tctx2.need_sleep = false; + fiber_wakeup(tctx2.f); + while (! tctx.is_deleted) + fiber_sleep(0); + note("now all the triggers are done and deleted"); + + free(t1); + free(t2); + if (tctx.ctx.member != NULL) + swim_member_unref(tctx.ctx.member); + + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(20); + swim_start_test(21); (void) ap; swim_test_ev_init(); @@ -979,6 +1122,7 @@ main_f(va_list ap) swim_test_indirect_ping(); swim_test_encryption(); swim_test_slow_net(); + swim_test_triggers(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 25fdb8833..3ebfe7ea0 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..20 +1..21 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -200,4 +200,30 @@ ok 19 - subtests # slow network leads to idle round steps, they should not produce a new message ok 20 - subtests *** swim_test_slow_net: done *** + *** swim_test_triggers *** + 1..21 + ok 1 - trigger is fired + ok 2 - is not deleted + ok 3 - ctx.member is set + ok 4 - ctx.events is set + ok 5 - payload is delivered, trigger caught that + ok 6 - S1 got S2' payload + ok 7 - mask says that + ok 8 - self payload is updated + ok 9 - self is set as a member + ok 10 - both incarnation and payload events are presented + ok 11 - suspicion fired a trigger + ok 12 - status suspected + ok 13 - death fired a trigger + ok 14 - status dead + ok 15 - drop fired a trigger + ok 16 - status dropped + ok 17 - dropped member is not presented in the member table + ok 18 - but is in the event context + ok 19 - yielding trigger is fired + ok 20 - non-yielding still is not + ok 21 - trigger is not deleted until all currently sleeping triggers are finished + # now all the triggers are done and deleted +ok 21 - subtests + *** swim_test_triggers: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index f72fa2450..463c62390 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -36,6 +36,7 @@ #include "trivia/util.h" #include "fiber.h" #include "msgpuck.h" +#include "trigger.h" /** * Drop rate packet filter to drop packets with a certain @@ -191,12 +192,32 @@ swim_cluster_id_to_uri(char *buffer, int id) sprintf(buffer, "127.0.0.1:%d", id + 1); } +/** + * A trigger to check correctness of event context, and ability + * to yield. + */ +void +swim_test_event_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + struct swim_on_member_event_ctx *ctx = + (struct swim_on_member_event_ctx *) event; + assert(ctx->events != 0); + assert((ctx->events & SWIM_EV_NEW) == 0 || + (ctx->events & SWIM_EV_DROP) == 0); + fiber_sleep(0); +} + /** Create a SWIM cluster node @a n with a 0-based @a id. */ static inline void swim_node_create(struct swim_node *n, int id) { n->swim = swim_new(); assert(n->swim != NULL); + struct trigger *t = (struct trigger *) malloc(sizeof(*t)); + trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free); + trigger_add(swim_trigger_list_on_member_event(n->swim), t); + char uri[128]; swim_cluster_id_to_uri(uri, id); n->uuid = uuid_nil; @@ -303,7 +324,7 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id) swim_member_uri(from), swim_member_uuid(from)); } -static const struct swim_member * +const struct swim_member * swim_cluster_member_view(struct swim_cluster *cluster, int node_id, int member_id) { @@ -615,6 +636,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster, * whatsoever. */ swim_test_transport_do_loop_step(loop); + if (cluster != NULL) + swim_cluster_run_triggers(cluster); while (! check(cluster, data)) { if (swim_time() >= deadline) return -1; @@ -625,6 +648,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster, * too. */ swim_test_transport_do_loop_step(loop); + if (cluster != NULL) + swim_cluster_run_triggers(cluster); } return 0; } @@ -868,6 +893,23 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, swim_loop_check_member_everywhere, &t); } +void +swim_cluster_run_triggers(struct swim_cluster *cluster) +{ + bool has_events; + do { + has_events = false; + struct swim_node *n = cluster->node; + for (int i = 0; i < cluster->size; ++i, ++n) { + if (n->swim != NULL && + swim_has_pending_events(n->swim)) { + has_events = true; + fiber_sleep(0); + } + } + } while (has_events); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 0e6f29d80..fde84e39b 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -168,6 +168,14 @@ int swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, const char *payload, int size); +/** + * Get a member object stored in a SWIM node @a node_id and + * showing a known state of a SWIM node @a member_id. + */ +const struct swim_member * +swim_cluster_member_view(struct swim_cluster *cluster, int node_id, + int member_id); + /** * Check if in the cluster every instance knowns the about other * instances. @@ -229,6 +237,10 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, int member_id, const char *payload, int payload_size, double timeout); +/** Run all pending triggers in the cluster. */ +void +swim_cluster_run_triggers(struct swim_cluster *cluster); + /** Process SWIM events for @a duration fake seconds. */ void swim_run_for(double duration); -- 2.20.1 (Apple Git-117)