From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component Date: Sat, 29 Dec 2018 13:14:10 +0300 Message-Id: <68930a7f6647aaa3f161223470e33a52012a3569.1546077015.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: SWIM - Scalable Weakly-consistent Infection-style Process Group Membership Protocol. It consists of 2 components: events dissemination and failure detection, and stores in memory a table of known remote hosts - members. Also some SWIM implementations have additional component: anti-entropy - periodical broadcast of a random subset of members table. Each SWIM component is different from others in both message structures and goals, they even could be sent in different messages. But SWIM describes piggybacking of messages: a ping message can piggyback a dissemination's one. SWIM has a main operating cycle during which it randomly chooses members from a member table and sends them events + ping. Answers are processed out of the main cycle asynchronously. Random selection provides even network load about ~1 message to each member regardless of the cluster size. Without randomness a member would get a network load of N messages each protocol step, since all other members will choose the same member on each step where N is the cluster size. Also SWIM describes a kind of fairness: when selecting a next member to ping, the protocol prefers LRU members. In code it would be too complicated, so Tarantool's implementation is slightly different, easier. Tarantool splits protocol operation into rounds. At the beginning of a round all members are randomly reordered and linked into a list. At each round step a member is popped from the list head, a message is sent to him, and he waits for a next round. In such implementation all random selection of the original SWIM is executed once per round. The round is 'planned' actually. A list is used instead of an array since new members can be added to its tail without realloc, and dead members can be removed as easy as that. Also Tarantool implements third component - anti-entropy. Why is it needed and even vital? Consider the example: two SWIM nodes, both are alive. Nothing happens, so the events list is empty, only pings are being sent periodically. Then a third node appears. It knows about one of existing nodes. How should it learn about another one? The cluster is stable, no new events, so the only chance is to wait until another server stops and event about it will be broadcasted. Anti-entropy is an extra simple component, it just piggybacks random part of members table with each regular ping. In the example above the new node will learn about the third one via anti-entropy messages of the second one. This commit introduces the first component - anti-entropy. With this component a member can discover other members, but can not detect who is already dead. It is a part of next commit. Part of #3234 --- src/CMakeLists.txt | 3 +- src/evio.c | 3 +- src/evio.h | 4 + src/lib/CMakeLists.txt | 1 + src/lib/swim/CMakeLists.txt | 7 + src/lib/swim/swim.c | 823 ++++++++++++++++++++++++++++++++++ src/lib/swim/swim.h | 95 ++++ src/lib/swim/swim_io.c | 163 +++++++ src/lib/swim/swim_io.h | 299 ++++++++++++ src/lib/swim/swim_transport.h | 64 +++ src/lua/init.c | 2 + src/lua/swim.c | 244 ++++++++++ src/lua/swim.h | 47 ++ 13 files changed, 1752 insertions(+), 3 deletions(-) create mode 100644 src/lib/swim/CMakeLists.txt create mode 100644 src/lib/swim/swim.c create mode 100644 src/lib/swim/swim.h create mode 100644 src/lib/swim/swim_io.c create mode 100644 src/lib/swim/swim_io.h create mode 100644 src/lib/swim/swim_transport.h create mode 100644 src/lua/swim.c create mode 100644 src/lua/swim.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 04de5ad04..785051966 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -182,6 +182,7 @@ set (server_sources lua/crypto.c lua/httpc.c lua/utf8.c + lua/swim.c lua/info.c ${lua_sources} ${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc @@ -228,7 +229,7 @@ endif() set_source_files_compile_flags(${server_sources}) add_library(server STATIC ${server_sources}) -target_link_libraries(server core bit uri uuid ${ICU_LIBRARIES}) +target_link_libraries(server core bit uri uuid swim ${ICU_LIBRARIES}) # Rule of thumb: if exporting a symbol from a static library, list the # library here. diff --git a/src/evio.c b/src/evio.c index 9ca14c45c..8610dbbe7 100644 --- a/src/evio.c +++ b/src/evio.c @@ -129,8 +129,7 @@ evio_setsockopt_client(int fd, int family, int type) return 0; } -/** Set options for server sockets. */ -static int +int evio_setsockopt_server(int fd, int family, int type) { int on = 1; diff --git a/src/evio.h b/src/evio.h index 69d641a60..872a21ab6 100644 --- a/src/evio.h +++ b/src/evio.h @@ -157,6 +157,10 @@ evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay) int evio_setsockopt_client(int fd, int family, int type); +/** Set options for server sockets. */ +int +evio_setsockopt_server(int fd, int family, int type); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 98ff19b60..4e21e7da8 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -5,6 +5,7 @@ add_subdirectory(small) add_subdirectory(salad) add_subdirectory(csv) add_subdirectory(json) +add_subdirectory(swim) if(ENABLE_BUNDLED_MSGPUCK) add_subdirectory(msgpuck EXCLUDE_FROM_ALL) endif() diff --git a/src/lib/swim/CMakeLists.txt b/src/lib/swim/CMakeLists.txt new file mode 100644 index 000000000..0fe8574ce --- /dev/null +++ b/src/lib/swim/CMakeLists.txt @@ -0,0 +1,7 @@ +set(lib_sources + swim.c + swim_io.c +) + +set_source_files_compile_flags(${lib_sources}) +add_library(swim STATIC ${lib_sources}) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c new file mode 100644 index 000000000..08c377374 --- /dev/null +++ b/src/lib/swim/swim.c @@ -0,0 +1,823 @@ +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "swim.h" +#include "swim_io.h" +#include "uri.h" +#include "assoc.h" +#include "fiber.h" +#include "msgpuck.h" +#include "info.h" + +/** + * SWIM - Scalable Weakly-consistent Infection-style Process Group + * Membership Protocol. It consists of 2 components: events + * dissemination and failure detection, and stores in memory a + * table of known remote hosts - members. Also some SWIM + * implementations have an additional component: anti-entropy - + * periodical broadcast of a random subset of members table. + * + * Each SWIM component is different from others in both message + * structures and goals, they even could be sent in different + * messages. But SWIM describes piggybacking of messages: a ping + * message can piggyback a dissemination's one. SWIM has a main + * operating cycle during which it randomly chooses members from a + * member table and sends them events + ping. Answers are + * processed out of the main cycle asynchronously. + * + * Random selection provides even network load about ~1 message to + * each member regardless of the cluster size. Without randomness + * a member would get a network load of N messages each protocol + * step, since all other members will choose the same member on + * each step where N is the cluster size. + * + * Also SWIM describes a kind of fairness: when selecting a next + * member to ping, the protocol prefers LRU members. In code it + * would too complicated, so Tarantool's implementation is + * slightly different, easier. + * + * Tarantool splits protocol operation into rounds. At the + * beginning of a round all members are randomly reordered and + * linked into a list. At each round step a member is popped from + * the list head, a message is sent to him, and he waits for the + * next round. In such implementation all random selection of the + * original SWIM is executed once per round. The round is + * 'planned' actually. A list is used instead of an array since + * new members can be added to its tail without realloc, and dead + * members can be removed as easy as that. + * + * Also Tarantool implements third component - anti-entropy. Why + * is it needed and even vital? Consider the example: two SWIM + * nodes, both are alive. Nothing happens, so the events list is + * empty, only pings are being sent periodically. Then a third + * node appears. It knows about one of existing nodes. How should + * it learn about another one? Sure, its known counterpart can try + * to notify another one, but it is UDP, so this event can lost. + * Anti-entropy is an extra simple component, it just piggybacks + * random part of members table with each regular ping. In the + * example above the new node will learn about the third one via + * anti-entropy messages of the second one soon or late. + */ + +enum { + /** How often to send membership messages and pings. */ + HEARTBEAT_RATE_DEFAULT = 1, +}; + +/** + * Take a random number not blindly calculating a module, but + * scaling random number down the given borders to save + * distribution. A result belongs the range [start, end]. + */ +static inline int +swim_scaled_rand(int start, int end) +{ + assert(end > start); + return rand() / (RAND_MAX / (end - start + 1) + 1); +} + +enum swim_member_status { + /** + * The instance is ok, it responds to requests, sends its + * members table. + */ + MEMBER_ALIVE = 0, + swim_member_status_MAX, +}; + +static const char *swim_member_status_strs[] = { + "alive", +}; + +/** + * A cluster member description. This structure describes the + * last known state of an instance, that is updated periodically + * via UDP according to SWIM protocol. + */ +struct swim_member { + /** + * Member status. Since the communication goes via UDP, + * actual status can be different, as well as different on + * other SWIM nodes. But SWIM guarantees that each member + * will learn a real status of an instance sometime. + */ + enum swim_member_status status; + /** + * Address of the instance to which send UDP packets. + * Unique identifier of the member. + */ + struct sockaddr_in addr; + /** + * Position in a queue of members in the current round. + */ + struct rlist in_queue_round; +}; + +/** + * SWIM instance. Each instance uses its own UDP port. Tarantool + * can have multiple SWIMs. + */ +struct swim { + /** + * Global hash of all known members of the cluster. Hash + * key is bitwise combination of ip and port, value is a + * struct member, describing a remote instance. The only + * purpose of such strange hash function is to be able to + * reuse mh_i64ptr_t instead of introducing one more + * implementation of mhash. + * + * Discovered members live here until they are + * unavailable - in such a case they are removed from the + * hash. But a subset of members are pinned - the ones + * added explicitly via API. When a member is pinned, it + * can not be removed from the hash, and the module will + * ping him constantly. + */ + struct mh_i64ptr_t *members; + /** + * This node. Used to do not send messages to self, it's + * meaningless. + */ + struct swim_member *self; + /** + * Members to which a message should be sent next during + * this round. + */ + struct rlist queue_round; + /** Generator of round step events. */ + struct ev_periodic round_tick; + /** + * Single round step task. It is impossible to have + * multiple round steps at the same time, so it is single + * and preallocated per SWIM instance. + */ + struct swim_task round_step_task; + /** Transport to send/receive data. */ + const struct swim_transport *transport; + /** Scheduler of output requests. */ + struct swim_scheduler scheduler; + /** + * An array of members shuffled on each round. Its head it + * sent to each member during one round as an + * anti-entropy message. + */ + struct swim_member **shuffled_members; +}; + +static inline uint64_t +sockaddr_in_hash(const struct sockaddr_in *a) +{ + return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port; +} + +/** + * Main round messages can carry merged failure detection + * messages and anti-entropy. With these keys the components can + * be distinguished from each other. + */ +enum swim_component_type { + SWIM_ANTI_ENTROPY = 0, +}; + +/** {{{ Anti-entropy component */ + +/** + * Attributes of each record of a broadcasted member table. Just + * the same as some of struct swim_member attributes. + */ +enum swim_member_key { + SWIM_MEMBER_STATUS = 0, + /** + * Now can only be IP. But in future UNIX sockets can be + * added. + */ + SWIM_MEMBER_ADDRESS, + SWIM_MEMBER_PORT, + swim_member_key_MAX, +}; + +/** SWIM anti-entropy MsgPack header template. */ +struct PACKED swim_anti_entropy_header_bin { + /** mp_encode_uint(SWIM_ANTI_ENTROPY) */ + uint8_t k_anti_entropy; + /** mp_encode_array() */ + uint8_t m_anti_entropy; + uint32_t v_anti_entropy; +}; + +static inline void +swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, + int batch_size) +{ + header->k_anti_entropy = SWIM_ANTI_ENTROPY; + header->m_anti_entropy = 0xdd; + header->v_anti_entropy = mp_bswap_u32(batch_size); +} + +/** SWIM member MsgPack template. */ +struct PACKED swim_member_bin { + /** mp_encode_map(3) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_MEMBER_STATUS) */ + uint8_t k_status; + /** mp_encode_uint(enum member_status) */ + uint8_t v_status; + + /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */ + uint8_t k_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_addr; + uint32_t v_addr; + + /** mp_encode_uint(SWIM_MEMBER_PORT) */ + uint8_t k_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_port; + uint16_t v_port; +}; + +static inline void +swim_member_bin_reset(struct swim_member_bin *header, + struct swim_member *member) +{ + header->v_status = member->status; + header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); + header->v_port = mp_bswap_u16(member->addr.sin_port); +} + +static inline void +swim_member_bin_create(struct swim_member_bin *header) +{ + header->m_header = 0x83; + header->k_status = SWIM_MEMBER_STATUS; + header->k_addr = SWIM_MEMBER_ADDRESS; + header->m_addr = 0xce; + header->k_port = SWIM_MEMBER_PORT; + header->m_port = 0xcd; +} + +/** }}} Anti-entropy component */ + +/** + * SWIM message structure: + * { + * SWIM_ANTI_ENTROPY: [ + * { + * SWIM_MEMBER_STATUS: uint, enum member_status, + * SWIM_MEMBER_ADDRESS: uint, ip, + * SWIM_MEMBER_PORT: uint, port + * }, + * ... + * ], + * } + */ + +/** + * Remove the member from all queues, hashes, destroy it and free + * the memory. + */ +static void +swim_member_delete(struct swim *swim, struct swim_member *member) +{ + uint64_t key = sockaddr_in_hash(&member->addr); + mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL); + assert(rc != mh_end(swim->members)); + mh_i64ptr_del(swim->members, rc, NULL); + rlist_del_entry(member, in_queue_round); + + free(member); +} + +/** + * Register a new member with a specified status. Here it is + * added to the hash, to the 'next' queue. + */ +static struct swim_member * +swim_member_new(struct swim *swim, const struct sockaddr_in *addr, + enum swim_member_status status) +{ + struct swim_member *member = + (struct swim_member *) calloc(1, sizeof(*member)); + if (member == NULL) { + diag_set(OutOfMemory, sizeof(*member), "calloc", "member"); + return NULL; + } + member->status = status; + member->addr = *addr; + struct mh_i64ptr_node_t node; + node.key = sockaddr_in_hash(addr); + node.val = member; + mh_int_t rc = mh_i64ptr_put(swim->members, &node, NULL, NULL); + if (rc == mh_end(swim->members)) { + free(member); + diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node"); + return NULL; + } + rlist_add_entry(&swim->queue_round, member, in_queue_round); + + return member; +} + +static inline struct swim_member * +swim_find_member(struct swim *swim, const struct sockaddr_in *addr) +{ + uint64_t hash = sockaddr_in_hash(addr); + mh_int_t node = mh_i64ptr_find(swim->members, hash, NULL); + if (node == mh_end(swim->members)) + return NULL; + return (struct swim_member *) mh_i64ptr_node(swim->members, node)->val; +} + +/** At the end of each round members table is shuffled. */ +static int +swim_shuffle_members(struct swim *swim) +{ + struct mh_i64ptr_t *members = swim->members; + struct swim_member **shuffled = swim->shuffled_members; + int new_size = mh_size(members); + int bsize = sizeof(shuffled[0]) * new_size; + struct swim_member **new_shuffled = + (struct swim_member **) realloc(shuffled, bsize); + if (new_shuffled == NULL) { + diag_set(OutOfMemory, bsize, "realloc", "new_shuffled"); + return -1; + } + shuffled = new_shuffled; + swim->shuffled_members = new_shuffled; + int i = 0; + for (mh_int_t node = mh_first(members), end = mh_end(members); + node != end; node = mh_next(members, node), ++i) { + shuffled[i] = (struct swim_member *) + mh_i64ptr_node(members, node)->val; + int j = swim_scaled_rand(0, i); + SWAP(shuffled[i], shuffled[j]); + } + return 0; +} + +/** + * Shuffle, filter members. Build randomly ordered queue of + * addressees. In other words, do all round preparation work. + */ +static int +swim_new_round(struct swim *swim) +{ + say_verbose("SWIM: start a new round"); + if (swim_shuffle_members(swim) != 0) + return -1; + rlist_create(&swim->queue_round); + int size = mh_size(swim->members); + for (int i = 0; i < size; ++i) { + if (swim->shuffled_members[i] != swim->self) { + rlist_add_entry(&swim->queue_round, + swim->shuffled_members[i], + in_queue_round); + } + } + return 0; +} + +/** + * Encode anti-entropy header and members data as many as + * possible to the end of a last packet. + * @retval -1 Error. + * @retval 0 Not error, but nothing is encoded. + * @retval 1 Something is encoded. + */ +static int +swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg) +{ + struct swim_anti_entropy_header_bin ae_header_bin; + struct swim_member_bin member_bin; + struct swim_packet *packet = swim_msg_last_packet(msg); + if (packet == NULL) + return -1; + char *header = swim_packet_alloc(packet, sizeof(ae_header_bin)); + if (header == NULL) + return 0; + int i = 0; + + swim_member_bin_create(&member_bin); + for (; i < (int) mh_size(swim->members); ++i) { + char *pos = swim_packet_alloc(packet, sizeof(member_bin)); + if (pos == NULL) + break; + struct swim_member *member = swim->shuffled_members[i]; + swim_member_bin_reset(&member_bin, member); + memcpy(pos, &member_bin, sizeof(member_bin)); + } + if (i == 0) + return 0; + swim_anti_entropy_header_bin_create(&ae_header_bin, i); + memcpy(header, &ae_header_bin, sizeof(ae_header_bin)); + swim_packet_flush(packet); + return 1; +} + +/** Encode SWIM components into a sequence of UDP packets. */ +static int +swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) +{ + swim_msg_create(msg); + struct swim_packet *packet = swim_msg_reserve(msg, 1); + if (packet == NULL) + return -1; + char *header = swim_packet_alloc(packet, 1); + int rc, map_size = 0; + + rc = swim_encode_anti_entropy(swim, msg); + if (rc < 0) + goto error; + map_size += rc; + + assert(mp_sizeof_map(map_size) == 1); + mp_encode_map(header, map_size); + return 0; +error: + swim_msg_destroy(msg); + return -1; +} + +/** Once per specified timeout trigger a next broadcast step. */ +static void +swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events) +{ + assert((events & EV_PERIODIC) != 0); + (void) events; + struct swim *swim = (struct swim *) p->data; + if ((swim->shuffled_members == NULL || + rlist_empty(&swim->queue_round)) && swim_new_round(swim) != 0) { + diag_log(); + return; + } + /* + * Possibly empty, if no members but self are specified. + */ + if (rlist_empty(&swim->queue_round)) + return; + + struct swim_msg *msg = &swim->round_step_task.msg; + if (swim_encode_round_msg(swim, msg) != 0) { + diag_log(); + return; + } + struct swim_member *m = + rlist_shift_entry(&swim->queue_round, struct swim_member, + in_queue_round); + swim_task_schedule(&swim->round_step_task, + swim->transport->send_round_msg, &m->addr); + ev_periodic_stop(loop, p); +} + +static void +swim_round_step_complete(struct swim_task *task) +{ + struct swim *swim = container_of(task, struct swim, round_step_task); + swim_msg_reset(&task->msg); + ev_periodic_start(loop(), &swim->round_tick); +} + +/** + * SWIM member attributes from anti-entropy and dissemination + * messages. + */ +struct swim_member_def { + struct sockaddr_in addr; + enum swim_member_status status; +}; + +static inline void +swim_member_def_create(struct swim_member_def *def) +{ + memset(def, 0, sizeof(*def)); + def->status = MEMBER_ALIVE; +} + +static void +swim_process_member_update(struct swim *swim, struct swim_member_def *def) +{ + struct swim_member *member = swim_find_member(swim, &def->addr); + /* + * Trivial processing of a new member - just add it to the + * members table. + */ + if (member == NULL) { + member = swim_member_new(swim, &def->addr, def->status); + if (member == NULL) + diag_log(); + } +} + +static int +swim_process_member_key(enum swim_member_key key, const char **pos, + const char *end, const char *msg_pref, + struct swim_member_def *def) +{ + switch(key) { + case SWIM_MEMBER_STATUS: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s member status should be uint", msg_pref); + return -1; + } + key = mp_decode_uint(pos); + if (key >= swim_member_status_MAX) { + say_error("%s unknown member status", msg_pref); + return -1; + } + def->status = (enum swim_member_status) key; + break; + case SWIM_MEMBER_ADDRESS: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s member address should be uint", msg_pref); + return -1; + } + def->addr.sin_addr.s_addr = mp_decode_uint(pos); + break; + case SWIM_MEMBER_PORT: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s member port should be uint", msg_pref); + return -1; + } + uint64_t port = mp_decode_uint(pos); + if (port > UINT16_MAX) { + say_error("%s member port is invalid", msg_pref); + return -1; + } + def->addr.sin_port = port; + break; + default: + unreachable(); + } + return 0; +} + +/** Decode an anti-entropy message, update members table. */ +static int +swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) +{ + const char *msg_pref = "Invalid SWIM anti-entropy message:"; + if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) { + say_error("%s message should be an array", msg_pref); + return -1; + } + uint64_t size = mp_decode_array(pos); + for (uint64_t i = 0; i < size; ++i) { + if (mp_typeof(**pos) != MP_MAP || + mp_check_map(*pos, end) > 0) { + say_error("%s member should be map", msg_pref); + return -1; + } + uint64_t map_size = mp_decode_map(pos); + struct swim_member_def def; + swim_member_def_create(&def); + for (uint64_t j = 0; j < map_size; ++j) { + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s member key should be uint", + msg_pref); + return -1; + } + uint64_t key = mp_decode_uint(pos); + if (key >= swim_member_key_MAX) { + say_error("%s unknown member key", msg_pref); + return -1; + } + if (swim_process_member_key(key, pos, end, msg_pref, + &def) != 0) + return -1; + } + if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) { + say_error("%s member address should be specified", + msg_pref); + return -1; + } + swim_process_member_update(swim, &def); + } + return 0; +} + +/** Receive and process a new message. */ +static void +swim_on_input(struct swim_scheduler *scheduler, + const struct swim_packet *packet, const struct sockaddr_in *src) +{ + (void) src; + const char *msg_pref = "Invalid SWIM message:"; + struct swim *swim = container_of(scheduler, struct swim, scheduler); + const char *pos = packet->body; + const char *end = packet->pos; + if (mp_typeof(*pos) != MP_MAP || mp_check_map(pos, end) > 0) { + say_error("%s expected map header", msg_pref); + return; + } + uint64_t map_size = mp_decode_map(&pos); + for (uint64_t i = 0; i < map_size; ++i) { + if (mp_typeof(*pos) != MP_UINT || mp_check_uint(pos, end) > 0) { + say_error("%s header should contain uint keys", + msg_pref); + return; + } + uint64_t key = mp_decode_uint(&pos); + switch(key) { + case SWIM_ANTI_ENTROPY: + say_verbose("SWIM: process anti-entropy"); + if (swim_process_anti_entropy(swim, &pos, end) != 0) + return; + break; + default: + say_error("%s unknown component type component is "\ + "supported", msg_pref); + return; + } + } +} + +/** + * Convert a string URI like "ip:port" to sockaddr_in structure. + */ +static int +uri_to_addr(const char *str, struct sockaddr_in *addr) +{ + struct uri uri; + if (uri_parse(&uri, str) != 0 || uri.service == NULL) + goto invalid_uri; + in_addr_t iaddr; + if (uri.host_len == strlen(URI_HOST_UNIX) && + memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) { + diag_set(IllegalParams, "Unix sockets are not supported"); + return -1; + } + if (uri.host_len == 0) { + iaddr = htonl(INADDR_ANY); + } else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) { + iaddr = htonl(INADDR_LOOPBACK); + } else { + iaddr = inet_addr(tt_cstr(uri.host, uri.host_len)); + if (iaddr == (in_addr_t) -1) + goto invalid_uri; + } + int port = htons(atoi(uri.service)); + memset(addr, 0, sizeof(*addr)); + addr->sin_family = AF_INET; + addr->sin_addr.s_addr = iaddr; + addr->sin_port = port; + return 0; + +invalid_uri: + diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str); + return -1; +} + +/** + * Initialize the module. By default, the module is turned off and + * does nothing. To start SWIM swim_cfg is used. + */ +struct swim * +swim_new(void) +{ + struct swim *swim = (struct swim *) calloc(1, sizeof(*swim)); + if (swim == NULL) { + diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim"); + return NULL; + } + swim->members = mh_i64ptr_new(); + if (swim->members == NULL) { + free(swim); + diag_set(OutOfMemory, sizeof(*swim->members), "malloc", + "members"); + return NULL; + } + rlist_create(&swim->queue_round); + ev_init(&swim->round_tick, swim_round_step_begin); + ev_periodic_set(&swim->round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL); + swim->round_tick.data = (void *) swim; + swim_task_create(&swim->round_step_task, &swim->scheduler, + swim_round_step_complete); + swim->transport = &swim_udp_transport; + swim_scheduler_create(&swim->scheduler, swim_on_input, swim->transport); + return swim; +} + +int +swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, + const struct swim_transport *new_transport) +{ + struct sockaddr_in addr; + if (uri_to_addr(uri, &addr) != 0) + return -1; + struct swim_member *new_self = NULL; + if (swim_find_member(swim, &addr) == NULL) { + new_self = swim_member_new(swim, &addr, MEMBER_ALIVE); + if (new_self == NULL) + return -1; + } + if (swim_scheduler_bind(&swim->scheduler, &addr) != 0) { + swim_member_delete(swim, new_self); + return -1; + } + ev_periodic_start(loop(), &swim->round_tick); + + if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0) + ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL); + + swim->self = new_self; + if (new_transport != NULL) { + swim->transport = new_transport; + swim_scheduler_set_transport(&swim->scheduler, new_transport); + } + return 0; +} + +int +swim_add_member(struct swim *swim, const char *uri) +{ + struct sockaddr_in addr; + if (uri_to_addr(uri, &addr) != 0) + return -1; + struct swim_member *member = swim_find_member(swim, &addr); + if (member == NULL) { + member = swim_member_new(swim, &addr, MEMBER_ALIVE); + if (member == NULL) + return -1; + } + return 0; +} + +int +swim_remove_member(struct swim *swim, const char *uri) +{ + struct sockaddr_in addr; + if (uri_to_addr(uri, &addr) != 0) + return -1; + struct swim_member *member = swim_find_member(swim, &addr); + if (member != NULL) + swim_member_delete(swim, member); + return 0; +} + +void +swim_info(struct swim *swim, struct info_handler *info) +{ + info_begin(info); + for (mh_int_t node = mh_first(swim->members), + end = mh_end(swim->members); node != end; + node = mh_next(swim->members, node)) { + struct swim_member *member = (struct swim_member *) + mh_i64ptr_node(swim->members, node)->val; + info_table_begin(info, + sio_strfaddr((struct sockaddr *) &member->addr, + sizeof(member->addr))); + info_append_str(info, "status", + swim_member_status_strs[member->status]); + info_table_end(info); + } + info_end(info); +} + +void +swim_delete(struct swim *swim) +{ + swim_scheduler_destroy(&swim->scheduler); + ev_periodic_stop(loop(), &swim->round_tick); + swim_task_destroy(&swim->round_step_task); + mh_int_t node = mh_first(swim->members); + while (node != mh_end(swim->members)) { + struct swim_member *m = (struct swim_member *) + mh_i64ptr_node(swim->members, node)->val; + swim_member_delete(swim, m); + node = mh_first(swim->members); + } + mh_i64ptr_delete(swim->members); + free(swim->shuffled_members); +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h new file mode 100644 index 000000000..350fa0cee --- /dev/null +++ b/src/lib/swim/swim.h @@ -0,0 +1,95 @@ +#ifndef TARANTOOL_SWIM_H_INCLUDED +#define TARANTOOL_SWIM_H_INCLUDED +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#if defined(__cplusplus) +extern "C" { +#endif + +struct info_handler; +struct swim; +struct swim_transport; + +/** + * Create new SWIM instance. Just creation without binding, + * setting any parameters or something. Allocation and + * initialization only. + */ +struct swim * +swim_new(void); + +/** + * Configure or reconfigure a SWIM instance. + * + * @param swim SWIM instance to configure. + * @param uri URI in the format "ip:port". + * @param heartbeat_rate Rate of sending round messages. It does + * mean that each member will be checked each + * @heartbeat_rate seconds. It is rather the protocol + * speed. Protocol period depends on member count and + * @heartbeat_rate. + * @param new_transport Transport API to send/receive messages. + * + * @retval 0 Success. + * @retval -1 Error. + */ +int +swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, + const struct swim_transport *new_transport); + +/** + * Stop listening and broadcasting messages, cleanup all internal + * structures, free memory. + */ +void +swim_delete(struct swim *swim); + +/** + * Add a new member. It is added to the members table and pinned. + * SWIM will ping the member, but never will delete him, even if + * pings fail. + */ +int +swim_add_member(struct swim *swim, const char *uri); + +/** Silently remove a member from members table. */ +int +swim_remove_member(struct swim *swim, const char *uri); + +/** Dump member statuses into @a info. */ +void +swim_info(struct swim *swim, struct info_handler *info); + +#if defined(__cplusplus) +} +#endif + +#endif /* TARANTOOL_SWIM_H_INCLUDED */ diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c new file mode 100644 index 000000000..8a1eca819 --- /dev/null +++ b/src/lib/swim/swim_io.c @@ -0,0 +1,163 @@ +#include "swim_io.h" +#include "fiber.h" + +static ssize_t +swim_udp_send_msg(int fd, const void *data, size_t size, + const struct sockaddr *addr, socklen_t addr_size) +{ + ssize_t ret = sio_sendto(fd, data, size, 0, addr, addr_size); + if (ret == -1 && sio_wouldblock(errno)) + return 0; + return ret; +} + +static ssize_t +swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr, + socklen_t *addr_size) +{ + ssize_t ret = sio_recvfrom(fd, buffer, size, 0, addr, addr_size); + if (ret == -1 && sio_wouldblock(errno)) + return 0; + return ret; +} + +struct swim_transport swim_udp_transport = { + /* .send_round_msg = */ swim_udp_send_msg, + /* .recv_msg = */ swim_udp_recv_msg, +}; + +struct swim_packet * +swim_packet_new(struct swim_msg *msg) +{ + struct swim_packet *res = + (struct swim_packet *) malloc(sizeof(*res)); + if (res == NULL) { + diag_set(OutOfMemory, sizeof(*res), "malloc", "res"); + return NULL; + } + swim_packet_create(res, msg); + return res; +} + +void +swim_task_schedule(struct swim_task *task, swim_transport_send_f send, + const struct sockaddr_in *dst) +{ + assert(! swim_task_is_active(task)); + task->send = send; + task->dst = *dst; + rlist_add_tail_entry(&task->scheduler->queue_output, task, + in_queue_output); + ev_io_start(loop(), &task->scheduler->output); +} + +static void +swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events); + +static void +swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events); + +void +swim_scheduler_create(struct swim_scheduler *scheduler, + swim_scheduler_on_input_f on_input, + const struct swim_transport *transport) +{ + ev_init(&scheduler->output, swim_scheduler_on_output); + scheduler->output.data = (void *) scheduler; + ev_init(&scheduler->input, swim_scheduler_on_input); + scheduler->input.data = (void *) scheduler; + rlist_create(&scheduler->queue_output); + scheduler->on_input = on_input; + swim_scheduler_set_transport(scheduler, transport); +} + +int +swim_scheduler_bind(struct swim_scheduler *scheduler, struct sockaddr_in *addr) +{ + struct sockaddr_in cur_addr; + socklen_t addrlen = sizeof(cur_addr); + int old_fd = scheduler->input.fd; + + if (old_fd != -1 && + getsockname(old_fd, (struct sockaddr *) &cur_addr, &addrlen) == 0 && + addr->sin_addr.s_addr == cur_addr.sin_addr.s_addr && + addr->sin_port == cur_addr.sin_port) + return 0; + + int fd = sio_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (fd < 0) + return -1; + if (sio_bind(fd, (struct sockaddr *) addr, sizeof(*addr)) != 0 || + evio_setsockopt_server(fd, AF_INET, SOCK_DGRAM) != 0) { + if (errno == EADDRINUSE) + diag_set(SocketError, sio_socketname(fd), "bind"); + close(fd); + return -1; + } + close(old_fd); + ev_io_set(&scheduler->input, fd, EV_READ); + ev_io_set(&scheduler->output, fd, EV_WRITE); + return 0; +} + +void +swim_scheduler_destroy(struct swim_scheduler *scheduler) +{ + close(scheduler->input.fd); + ev_io_stop(loop(), &scheduler->output); + ev_io_stop(loop(), &scheduler->input); +} + +static void +swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) +{ + assert((events & EV_WRITE) != 0); + (void) events; + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; + if (rlist_empty(&scheduler->queue_output)) { + ev_io_stop(loop, io); + return; + } + struct swim_task *task = + rlist_shift_entry(&scheduler->queue_output, struct swim_task, + in_queue_output); + say_verbose("SWIM: send to %s", + sio_strfaddr((struct sockaddr *) &task->dst, + sizeof(task->dst))); + for (struct swim_packet *packet = swim_msg_first_packet(&task->msg); + packet != NULL; packet = swim_packet_next(packet)) { + if (task->send(io->fd, packet->body, packet->pos - packet->body, + (struct sockaddr *) &task->dst, + sizeof(task->dst)) == -1) + diag_log(); + } + task->complete(task); +} + +static void +swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) +{ + assert((events & EV_READ) != 0); + (void) events; + (void) loop; + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; + struct sockaddr_in addr; + socklen_t len = sizeof(addr); + struct swim_packet packet; + struct swim_msg msg; + swim_msg_create(&msg); + swim_packet_create(&packet, &msg); + swim_transport_recv_f recv = scheduler->transport->recv_msg; + ssize_t size = recv(io->fd, packet.body, packet.end - packet.body, + (struct sockaddr *) &addr, &len); + if (size <= 0) { + if (size < 0) + diag_log(); + return; + } + swim_packet_advance(&packet, size); + swim_packet_flush(&packet); + say_verbose("SWIM: received from %s", + sio_strfaddr((struct sockaddr *) &addr, len)); + scheduler->on_input(scheduler, &packet, &addr); +} diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h new file mode 100644 index 000000000..fc44fd0a7 --- /dev/null +++ b/src/lib/swim/swim_io.h @@ -0,0 +1,299 @@ +#ifndef TARANTOOL_SWIM_IO_H_INCLUDED +#define TARANTOOL_SWIM_IO_H_INCLUDED +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "trivia/util.h" +#include "small/rlist.h" +#include "salad/stailq.h" +#include "swim_transport.h" +#include "evio.h" +#include +#include + +#if defined(__cplusplus) +extern "C" { +#endif + +struct swim_task; +struct swim_scheduler; + +/** UDP sendto/recvfrom implementation of swim_transport. */ +extern struct swim_transport swim_udp_transport; + +enum { + /** + * Default MTU is 1500. MTU (when IPv4 is used) consists + * of IPv4 header, UDP header, Data. IPv4 has 20 bytes + * header, UDP - 8 bytes. So Data = 1500 - 20 - 8 = 1472. + * TODO: adapt to other MTUs which can be reduced in some + * networks by their admins. + */ + UDP_PACKET_SIZE = 1472, +}; + +/** + * UDP body size is limited by definition. To be able to send a + * big message it should be split into multiple packets. Each + * packet is self-sufficient piece of data, which can withstand + * loss of other packets and be processed independently. + */ +struct swim_packet { + /** Place in the whole big message, struct swim_msg. */ + struct stailq_entry in_msg; + /** Last valid position in the body. */ + char *pos; + /** Position beyond pos, contains unfinished data. */ + char *next_pos; + /** Packet body. */ + char body[UDP_PACKET_SIZE]; + /** + * Pointer to the end of the body. Just syntax sugar to do + * not write 'body + sizeof(body)' each time. + */ + char end[0]; +}; + +struct swim_msg { + struct stailq packets; +}; + +static inline bool +swim_packet_is_last(struct swim_packet *packet) +{ + return stailq_next(&packet->in_msg) == NULL; +} + +static inline char * +swim_packet_reserve(struct swim_packet *packet, int size) +{ + return packet->next_pos + size > packet->end ? NULL : packet->next_pos; +} + +static inline void +swim_packet_advance(struct swim_packet *packet, int size) +{ + assert(packet->next_pos + size <= packet->end); + packet->next_pos += size; +} + +static inline char * +swim_packet_alloc(struct swim_packet *packet, int size) +{ + char *res = swim_packet_reserve(packet, size); + if (res == NULL) + return NULL; + swim_packet_advance(packet, size); + return res; +} + +static inline void +swim_packet_flush(struct swim_packet *packet) +{ + assert(packet->next_pos >= packet->pos); + packet->pos = packet->next_pos; +} + +static inline struct swim_packet * +swim_packet_next(struct swim_packet *packet) +{ + if (swim_packet_is_last(packet)) + return NULL; + return stailq_next_entry(packet, in_msg); +} + +static inline void +swim_packet_delete(struct swim_packet *packet) +{ + free(packet); +} + +static inline void +swim_packet_create(struct swim_packet *packet, struct swim_msg *msg) +{ + stailq_add_tail_entry(&msg->packets, packet, in_msg); + packet->pos = packet->body; + packet->next_pos = packet->body; +} + +struct swim_packet * +swim_packet_new(struct swim_msg *msg); + +static inline bool +swim_msg_is_empty(struct swim_msg *msg) +{ + return stailq_empty(&msg->packets); +} + +static inline struct swim_packet * +swim_msg_first_packet(struct swim_msg *msg) +{ + if (swim_msg_is_empty(msg)) + return NULL; + return stailq_first_entry(&msg->packets, struct swim_packet, in_msg); +} + +static inline struct swim_packet * +swim_msg_last_packet(struct swim_msg *msg) +{ + if (swim_msg_is_empty(msg)) + return NULL; + return stailq_last_entry(&msg->packets, struct swim_packet, in_msg); +} + +static inline void +swim_msg_create(struct swim_msg *msg) +{ + stailq_create(&msg->packets); +} + +static inline void +swim_msg_destroy(struct swim_msg *msg) +{ + struct swim_packet *packet, *tmp; + stailq_foreach_entry_safe(packet, tmp, &msg->packets, in_msg) + swim_packet_delete(packet); +} + +static inline void +swim_msg_reset(struct swim_msg *msg) +{ + swim_msg_destroy(msg); + swim_msg_create(msg); +} + +static inline struct swim_packet * +swim_msg_reserve(struct swim_msg *msg, int size) +{ + struct swim_packet *packet = swim_msg_last_packet(msg); + assert(size <= (int) sizeof(packet->body)); + if (packet == NULL || swim_packet_reserve(packet, size) == NULL) + return swim_packet_new(msg); + return packet; +} + +typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler, + const struct swim_packet *packet, + const struct sockaddr_in *src); + +struct swim_scheduler { + /** Transport used to receive packets. */ + const struct swim_transport *transport; + /** Function called when a packet is received. */ + swim_scheduler_on_input_f on_input; + /** + * Event dispatcher of incomming messages. Takes them from + * network. + */ + struct ev_io input; + /** + * Event dispatcher of outcomming messages. Takes tasks + * from queue_output. + */ + struct ev_io output; + /** Queue of output tasks ready to write now. */ + struct rlist queue_output; +}; + +void +swim_scheduler_create(struct swim_scheduler *scheduler, + swim_scheduler_on_input_f on_input, + const struct swim_transport *transport); + +int +swim_scheduler_bind(struct swim_scheduler *scheduler, struct sockaddr_in *addr); + +static inline void +swim_scheduler_set_transport(struct swim_scheduler *scheduler, + const struct swim_transport *transport) +{ + scheduler->transport = transport; +} + +void +swim_scheduler_destroy(struct swim_scheduler *scheduler); + +/** + * Each SWIM component in a common case independently may want to + * push some data into the network. Dissemination sends events, + * failure detection sends pings, acks. Anti-entropy sends member + * tables. The intention to send a data is called IO task and is + * stored in a queue that is dispatched when output is possible. + */ +typedef void (*swim_task_f)(struct swim_task *); + +struct swim_task { + /** Function to send each packet. */ + swim_transport_send_f send; + /** Function called when the task has completed. */ + swim_task_f complete; + /** Message to send. */ + struct swim_msg msg; + /** Destination address. */ + struct sockaddr_in dst; + /** Place in a queue of tasks. */ + struct rlist in_queue_output; + /** SWIM scheduler managing the task. */ + struct swim_scheduler *scheduler; +}; + +void +swim_task_schedule(struct swim_task *task, swim_transport_send_f send, + const struct sockaddr_in *dst); + +static inline void +swim_task_create(struct swim_task *task, struct swim_scheduler *scheduler, + swim_task_f complete) +{ + memset(task, 0, sizeof(*task)); + task->complete = complete; + swim_msg_create(&task->msg); + rlist_create(&task->in_queue_output); + task->scheduler = scheduler; +} + +static inline bool +swim_task_is_active(struct swim_task *task) +{ + return ! rlist_empty(&task->in_queue_output); +} + +static inline void +swim_task_destroy(struct swim_task *task) +{ + rlist_del_entry(task, in_queue_output); + swim_msg_destroy(&task->msg); +} + +#if defined(__cplusplus) +} +#endif + +#endif /* TARANTOOL_SWIM_IO_H_INCLUDED */ \ No newline at end of file diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h new file mode 100644 index 000000000..d629526ac --- /dev/null +++ b/src/lib/swim/swim_transport.h @@ -0,0 +1,64 @@ +#ifndef TARANTOOL_SWIM_TRANSPORT_H_INCLUDED +#define TARANTOOL_SWIM_TRANSPORT_H_INCLUDED +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include + +typedef ssize_t (*swim_transport_send_f)(int fd, const void *data, size_t size, + const struct sockaddr *addr, + socklen_t addr_size); + +typedef ssize_t (*swim_transport_recv_f)(int fd, void *buffer, size_t size, + struct sockaddr *addr, + socklen_t *addr_size); + +/** + * Virtual methods of SWIM protocol steps. Usual implementation - + * just sendto/recvfrom for all methods. But for testing via this + * interface errors could be simulated. + */ +struct swim_transport { + /** + * Send regular round message containing dissemination, + * failure detection and anti-entropy sections. Parameters + * are like sendto(). + */ + swim_transport_send_f send_round_msg; + + /** + * Receive a message. Not necessary round or failure + * detection. Before message is received, its type is + * unknown. Parameters are like recvfrom(). + */ + swim_transport_recv_f recv_msg; +}; + +#endif /* TARANTOOL_SWIM_TRANSPORT_H_INCLUDED */ diff --git a/src/lua/init.c b/src/lua/init.c index ca4b47f3a..5b47aa3e3 100644 --- a/src/lua/init.c +++ b/src/lua/init.c @@ -58,6 +58,7 @@ #include "lua/fio.h" #include "lua/httpc.h" #include "lua/utf8.h" +#include "lua/swim.h" #include "digest.h" #include @@ -450,6 +451,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv) tarantool_lua_socket_init(L); tarantool_lua_pickle_init(L); tarantool_lua_digest_init(L); + tarantool_lua_swim_init(L); luaopen_http_client_driver(L); lua_pop(L, 1); luaopen_msgpack(L); diff --git a/src/lua/swim.c b/src/lua/swim.c new file mode 100644 index 000000000..4748ef139 --- /dev/null +++ b/src/lua/swim.c @@ -0,0 +1,244 @@ +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "utils.h" +#include "diag.h" +#include "swim/swim.h" +#include "small/ibuf.h" +#include "lua/info.h" +#include + +/** SWIM instances are pushed as cdata with this id. */ +uint32_t CTID_STRUCT_SWIM_PTR; + +/** + * Get @a n-th value from a Lua stack as a struct swim pointer. + * @param L Lua state. + * @param n Where pointer is stored on Lua stack. + * + * @retval NULL The stack position does not exist or it is not a + * struct swim pointer. + * @retval not NULL Valid SWIM pointer. + */ +static inline struct swim * +lua_swim_ptr(struct lua_State *L, int n) +{ + uint32_t ctypeid; + if (lua_type(L, n) != LUA_TCDATA) + return NULL; + void *swim = luaL_checkcdata(L, n, &ctypeid); + if (ctypeid != CTID_STRUCT_SWIM_PTR) + return NULL; + return *(struct swim **) swim; +} + +/** + * Delete SWIM instance passed via first Lua stack position. Used + * by Lua GC. + */ +static int +lua_swim_gc(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "SWIM gc expected struct swim *"); + swim_delete(swim); + return 0; +} + +/** + * Configure @a swim instance using a table stored in @a ncfg-th + * position on Lua stack. + * @param L Lua state. + * @param ncfg Where configuration is stored on Lua stack. + * @param swim SWIM instance to configure. + * @param funcname Caller function name to use in error messages. + * + * @retval 0 Success. + * @retval -1 Error, stored in diagnostics area. Critical errors + * like OOM or incorrect usage can throw. + */ +static int +lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim, + const char *funcname) +{ + if (! lua_istable(L, ncfg)) { + return luaL_error(L, "swim.%s: expected table config", + funcname); + } + + const char *server_uri; + lua_getfield(L, ncfg, "server"); + if (lua_isstring(L, -1)) { + server_uri = lua_tostring(L, -1); + } else { + return luaL_error(L, "swim.%s: server should be string URI", + funcname); + } + lua_pop(L, 1); + + double heartbeat_rate; + lua_getfield(L, ncfg, "heartbeat"); + if (lua_isnumber(L, -1)) { + heartbeat_rate = lua_tonumber(L, -1); + if (heartbeat_rate <= 0) { + return luaL_error(L, "swim.%s: heartbeat should be "\ + "positive number", funcname); + } + } else if (! lua_isnil(L, -1)) { + return luaL_error(L, "swim.%s: heartbeat should be positive "\ + "number", funcname); + } else { + heartbeat_rate = -1; + } + lua_pop(L, 1); + + return swim_cfg(swim, server_uri, heartbeat_rate, NULL); +} + +static int +lua_swim_new(struct lua_State *L) +{ + int top = lua_gettop(L); + if (top > 1) + return luaL_error(L, "Usage: swim.new([{}]"); + struct swim *swim = swim_new(); + if (swim != NULL) { + *(struct swim **)luaL_pushcdata(L, CTID_STRUCT_SWIM_PTR) = swim; + lua_pushcfunction(L, lua_swim_gc); + luaL_setcdatagc(L, -2); + if (top == 0 || lua_swim_cfg_impl(L, 1, swim, "new") == 0) + return 1; + lua_pop(L, 1); + } + lua_pushnil(L); + luaT_pusherror(L, diag_last_error(diag_get())); + return 2; +} + +static int +lua_swim_cfg(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "Usage: swim:cfg({})"); + if (lua_swim_cfg_impl(L, 2, swim, "cfg") != 0) { + lua_pushnil(L); + luaT_pusherror(L, diag_last_error(diag_get())); + return 2; + } + lua_pushboolean(L, true); + return 1; +} + +static inline int +lua_swim_add_remove_member(struct lua_State *L, const char *funcname, + int (*action)(struct swim *, const char *)) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (lua_gettop(L) != 2 || swim == NULL) + return luaL_error(L, "Usage: swim:%s(uri)", funcname); + const char *member_uri; + if (lua_isstring(L, -1)) { + member_uri = lua_tostring(L, 1); + } else { + return luaL_error(L, "swim.%s: member URI should be array", + funcname); + } + + if (action(swim, member_uri) != 0) { + lua_pushnil(L); + luaT_pusherror(L, diag_last_error(diag_get())); + return 2; + } + lua_pushboolean(L, true); + return 1; +} + +static int +lua_swim_add_member(struct lua_State *L) +{ + return lua_swim_add_remove_member(L, "add_member", swim_add_member); +} + +static int +lua_swim_remove_member(struct lua_State *L) +{ + return lua_swim_add_remove_member(L, "remove_member", + swim_remove_member); +} + +static int +lua_swim_delete(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "Usage: swim:delete()"); + swim_delete(swim); + uint32_t ctypeid; + struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid); + assert(ctypeid == CTID_STRUCT_SWIM_PTR); + *cdata = NULL; + return 0; +} + +static int +lua_swim_info(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "Usage: swim:info()"); + struct info_handler info; + luaT_info_handler_create(&info, L); + swim_info(swim, &info); + return 1; +} + +void +tarantool_lua_swim_init(struct lua_State *L) +{ + static const struct luaL_Reg lua_swim_methods [] = { + {"new", lua_swim_new}, + {"cfg", lua_swim_cfg}, + {"add_member", lua_swim_add_member}, + {"remove_member", lua_swim_remove_member}, + {"delete", lua_swim_delete}, + {"info", lua_swim_info}, + {NULL, NULL} + }; + luaL_register_module(L, "swim", lua_swim_methods); + lua_pop(L, 1); + int rc = luaL_cdef(L, "struct swim;"); + assert(rc == 0); + (void) rc; + CTID_STRUCT_SWIM_PTR = luaL_ctypeid(L, "struct swim *"); + assert(CTID_STRUCT_SWIM_PTR != 0); +}; diff --git a/src/lua/swim.h b/src/lua/swim.h new file mode 100644 index 000000000..989cf62b3 --- /dev/null +++ b/src/lua/swim.h @@ -0,0 +1,47 @@ +#ifndef INCLUDES_TARANTOOL_LUA_SWIM_H +#define INCLUDES_TARANTOOL_LUA_SWIM_H +/* + * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#if defined(__cplusplus) +extern "C" { +#endif + +struct lua_State; + +void +tarantool_lua_swim_init(struct lua_State *L); + +#if defined(__cplusplus) +} +#endif + +#endif /* INCLUDES_TARANTOOL_LUA_SWIM_H */ -- 2.17.2 (Apple Git-113)