From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@freelists.org Cc: kostja@tarantool.org, vdavydov.dev@gmail.com Subject: [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component Date: Thu, 31 Jan 2019 00:28:33 +0300 [thread overview] Message-ID: <d52c1002cdeb5f8a85d43d6d5dc9a02a0726f08f.1548883137.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1548883137.git.v.shpilevoy@tarantool.org> In-Reply-To: <cover.1548883137.git.v.shpilevoy@tarantool.org> SWIM - Scalable Weakly-consistent Infection-style Process Group Membership Protocol. It consists of 2 components: events dissemination and failure detection, and stores in memory a table of known remote hosts - members. Also some SWIM implementations have an additional component: anti-entropy - periodical broadcast of a random subset of members table. Dissemination component spreads over the cluster changes occurred with members. Failure detection constantly searches for failed dead members. Anti-entropy just sends all known information at once about a member so as to synchronize it among all other members in case some events were not disseminated (UDP problems). Anti-entropy is the most vital component, since it can work without dissemination and failure detection. But they can not work properly with out the former. Consider the example: two SWIM nodes, both are alive. Nothing happens, so the events list is empty, only pings are being sent periodically. Then a third node appears. It knows about one of existing nodes. How should it learn about another one? Sure, its known counterpart can try to notify another one, but it is UDP, so this event can lost. Anti-entropy is an extra simple component, it just piggybacks random part of members table with each regular round message. In the example above the new node will learn about the third one via anti-entropy messages of the second one soon or late. Part of #3234 --- src/CMakeLists.txt | 3 +- src/diag.h | 2 + src/exception.cc | 23 + src/exception.h | 7 + src/lib/CMakeLists.txt | 1 + src/lib/swim/CMakeLists.txt | 6 + src/lib/swim/swim.c | 828 ++++++++++++++++++++++++++++++++ src/lib/swim/swim.h | 91 ++++ src/lib/swim/swim_io.c | 204 ++++++++ src/lib/swim/swim_io.h | 225 +++++++++ src/lib/swim/swim_proto.c | 327 +++++++++++++ src/lib/swim/swim_proto.h | 320 ++++++++++++ src/lib/swim/swim_transport.h | 73 +++ src/lua/init.c | 2 + src/lua/swim.c | 370 ++++++++++++++ src/lua/swim.h | 47 ++ test/unit/CMakeLists.txt | 3 + test/unit/swim.c | 34 ++ test/unit/swim_test_transport.c | 78 +++ 19 files changed, 2643 insertions(+), 1 deletion(-) create mode 100644 src/lib/swim/CMakeLists.txt create mode 100644 src/lib/swim/swim.c create mode 100644 src/lib/swim/swim.h create mode 100644 src/lib/swim/swim_io.c create mode 100644 src/lib/swim/swim_io.h create mode 100644 src/lib/swim/swim_proto.c create mode 100644 src/lib/swim/swim_proto.h create mode 100644 src/lib/swim/swim_transport.h create mode 100644 src/lua/swim.c create mode 100644 src/lua/swim.h create mode 100644 test/unit/swim.c create mode 100644 test/unit/swim_test_transport.c diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 38bd576e6..d2d33043b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -182,6 +182,7 @@ set (server_sources lua/crypto.c lua/httpc.c lua/utf8.c + lua/swim.c lua/info.c ${lua_sources} ${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc @@ -228,7 +229,7 @@ endif() set_source_files_compile_flags(${server_sources}) add_library(server STATIC ${server_sources}) -target_link_libraries(server core bit uri uuid ${ICU_LIBRARIES}) +target_link_libraries(server core bit uri uuid swim swim_udp ${ICU_LIBRARIES}) # Rule of thumb: if exporting a symbol from a static library, list the # library here. diff --git a/src/diag.h b/src/diag.h index a0b71f049..f4189cd67 100644 --- a/src/diag.h +++ b/src/diag.h @@ -249,6 +249,8 @@ struct error * BuildSystemError(const char *file, unsigned line, const char *format, ...); struct error * BuildCollationError(const char *file, unsigned line, const char *format, ...); +struct error * +BuildSwimError(const char *file, unsigned line, const char *format, ...); struct index_def; diff --git a/src/exception.cc b/src/exception.cc index 8fdb24ad9..6124c70d0 100644 --- a/src/exception.cc +++ b/src/exception.cc @@ -269,6 +269,17 @@ CollationError::CollationError(const char *file, unsigned line, va_end(ap); } +const struct type_info type_SwimError = make_type("SwimError", &type_Exception); + +SwimError::SwimError(const char *file, unsigned line, const char *format, ...) + : Exception(&type_SwimError, file, line) +{ + va_list ap; + va_start(ap, format); + error_vformat_msg(this, format, ap); + va_end(ap); +} + #define BuildAlloc(type) \ void *p = malloc(sizeof(type)); \ if (p == NULL) \ @@ -348,6 +359,18 @@ BuildCollationError(const char *file, unsigned line, const char *format, ...) return e; } +struct error * +BuildSwimError(const char *file, unsigned line, const char *format, ...) +{ + BuildAlloc(SwimError); + SwimError *e = new (p) SwimError(file, line, ""); + va_list ap; + va_start(ap, format); + error_vformat_msg(e, format, ap); + va_end(ap); + return e; +} + struct error * BuildSocketError(const char *file, unsigned line, const char *socketname, const char *format, ...) diff --git a/src/exception.h b/src/exception.h index f08d946b5..2ec8a74ee 100644 --- a/src/exception.h +++ b/src/exception.h @@ -50,6 +50,7 @@ extern const struct type_info type_LuajitError; extern const struct type_info type_IllegalParams; extern const struct type_info type_SystemError; extern const struct type_info type_CollationError; +extern const struct type_info type_SwimError; const char * exception_get_string(struct error *e, const struct method_info *method); @@ -159,6 +160,12 @@ public: virtual void raise() { throw this; } }; +class SwimError: public Exception { +public: + SwimError(const char *file, unsigned line, const char *format, ...); + virtual void raise() { throw this; } +}; + /** * Initialize the exception subsystem. */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 98ff19b60..4e21e7da8 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -5,6 +5,7 @@ add_subdirectory(small) add_subdirectory(salad) add_subdirectory(csv) add_subdirectory(json) +add_subdirectory(swim) if(ENABLE_BUNDLED_MSGPUCK) add_subdirectory(msgpuck EXCLUDE_FROM_ALL) endif() diff --git a/src/lib/swim/CMakeLists.txt b/src/lib/swim/CMakeLists.txt new file mode 100644 index 000000000..7af8aeda5 --- /dev/null +++ b/src/lib/swim/CMakeLists.txt @@ -0,0 +1,6 @@ +set(lib_swim_sources swim.c swim_io.c swim_proto.c) +set(lib_swim_udp_sources swim_udp_transport.c) + +set_source_files_compile_flags(${lib_swim_sources} ${lib_swim_udp_sources}) +add_library(swim STATIC ${lib_swim_sources}) +add_library(swim_udp STATIC ${lib_swim_udp_sources}) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c new file mode 100644 index 000000000..adf01cfad --- /dev/null +++ b/src/lib/swim/swim.c @@ -0,0 +1,828 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "swim.h" +#include "swim_io.h" +#include "swim_proto.h" +#include "uri.h" +#include "fiber.h" +#include "msgpuck.h" +#include "info.h" +#include "assoc.h" +#include "sio.h" + +/** + * SWIM - Scalable Weakly-consistent Infection-style Process Group + * Membership Protocol. It consists of 2 components: events + * dissemination and failure detection, and stores in memory a + * table of known remote hosts - members. Also some SWIM + * implementations have an additional component: anti-entropy - + * periodical broadcast of a random subset of members table. + * + * Each SWIM component is different from others in both message + * structures and goals, they even could be sent in different + * messages. But SWIM describes piggybacking of messages: a ping + * message can piggyback a dissemination's one. SWIM has a main + * operating cycle during which it randomly chooses members from a + * member table and sends to them events + ping. Replies are + * processed out of the main cycle asynchronously. + * + * Random selection provides even network load about ~1 message to + * each member per protocol step regardless of the cluster size. + * Without randomness a member would get a network load of N + * messages each protocol step, since all other members will + * choose the same member on each step where N is the cluster size. + * + * Also SWIM describes a kind of fairness: when selecting a next + * member to ping, the protocol prefers LRU members. In code it + * would be too complicated, so Tarantool's implementation is + * slightly different, easier. + * + * Tarantool splits protocol operation into rounds. At the + * beginning of a round all members are randomly reordered and + * linked into a list. At each round step a member is popped from + * the list head, a message is sent to him, and he waits for the + * next round. In such implementation all random selection of the + * original SWIM is executed once per round. The round is + * 'planned' actually. A list is used instead of an array since + * new members can be added to its tail without realloc, and dead + * members can be removed as easy as that. + * + * Also Tarantool implements third component - anti-entropy. Why + * is it needed and even vital? Consider the example: two SWIM + * nodes, both are alive. Nothing happens, so the events list is + * empty, only pings are being sent periodically. Then a third + * node appears. It knows about one of existing nodes. How should + * it learn about another one? Sure, its known counterpart can try + * to notify another one, but it is UDP, so this event can lost. + * Anti-entropy is an extra simple component, it just piggybacks + * random part of members table with each regular round message. + * In the example above the new node will learn about the third + * one via anti-entropy messages of the second one soon or late. + * + * Surprisingly, original SWIM does not describe any addressing, + * how to uniquely identify a member. IP/port fallaciously could + * be considered as a good unique identifier, but some arguments + * below demolish this belief: + * + * - if instances work in separate containers, they can have + * the same IP/port inside a container NATed to a unique + * IP/port outside the container; + * + * - IP/port are likely to change during instance lifecycle. + * Once IP/port are changed, a ghost of the old member's + * configuration still lives for a while until it is + * suspected, dead and GC-ed. Taking into account that ACK + * timeout can be tens of seconds, 'Dead Souls' can exist + * unpleasantly long. + * + * Tarantool SWIM implementation uses UUIDs as unique identifiers. + * UUID is much more unlikely to change than IP/port. But even if + * that happens, dissemination component for a while gossips the + * new UUID together with the old one. + * + * SWIM implementation is split into 3 parts: protocol logic, + * transport level, protocol structure. + * + * - protocol logic consist of how to react on various events, + * failure detection pings/acks, how often to send messages, + * handles logic of three components (failure detection, + * anti-entropy, dissemination); + * + * - transport level handles routing, transport headers, + * packet forwarding; + * + * - protocol structure describes how packet looks in + * MessagePack, which section and header follows which + * another one. + */ + +enum { + /** + * How often to send membership messages and pings in + * seconds. Nothing special in this concrete default + * value. + */ + HEARTBEAT_RATE_DEFAULT = 1, +}; + +/** + * Take a random number not blindly calculating a modulo, but + * scaling random number down the given boundaries to preserve the + * original distribution. The result belongs the range + * [start, end]. + */ +static inline int +swim_scaled_rand(int start, int end) +{ + assert(end > start); + /* + * RAND_MAX is likely to be INT_MAX - hardly SWIM will + * ever be used in such a huge cluster. + */ + assert(end - start < RAND_MAX); + return rand() / (RAND_MAX / (end - start + 1) + 1); +} + +/** Calculate UUID hash to use as a members table key. */ +static inline uint32_t +swim_uuid_hash(const struct tt_uuid *uuid) +{ + return mh_strn_hash((const char *) uuid, UUID_LEN); +} + +/** + * Helper to do not call tt_static_buf() in all places where it is + * wanted to get string UUID. + */ +static inline const char * +swim_uuid_str(const struct tt_uuid *uuid) +{ + char *buf = tt_static_buf(); + tt_uuid_to_string(uuid, buf); + return buf; +} + +/** + * A cluster member description. This structure describes the + * last known state of an instance, that is updated periodically + * via UDP according to SWIM protocol. + */ +struct swim_member { + /** + * Member status. Since the communication goes via UDP, + * actual status can be different, as well as different on + * other SWIM nodes. But SWIM guarantees that each member + * will learn a real status of an instance sometime. + */ + enum swim_member_status status; + /** Address of the instance to which send UDP packets. */ + struct sockaddr_in addr; + /** Unique identifier of the member. Members table key. */ + struct tt_uuid uuid; + /** Cached hash of the uuid for members table lookups. */ + uint32_t hash; + /** + * Position in a queue of members in the current round. + */ + struct rlist in_queue_round; +}; + +#define mh_name _swim_table +struct mh_swim_table_key { + uint32_t hash; + const struct tt_uuid *uuid; +}; +#define mh_key_t struct mh_swim_table_key +#define mh_node_t struct swim_member * +#define mh_arg_t void * +#define mh_hash(a, arg) ((*a)->hash) +#define mh_hash_key(a, arg) (a.hash) +#define mh_cmp(a, b, arg) (tt_uuid_compare(&(*a)->uuid, &(*b)->uuid)) +#define mh_cmp_key(a, b, arg) (tt_uuid_compare(a.uuid, &(*b)->uuid)) +#define MH_SOURCE 1 +#include "salad/mhash.h" + +/** + * SWIM instance. Stores configuration, manages periodical tasks, + * rounds. Each member has an object of this type on its host, + * while on others it is represented as a struct swim_member + * object. + */ +struct swim { + /** + * Global hash of all known members of the cluster. Hash + * key is UUID, value is a struct member, describing a + * remote instance. Discovered members live here until + * they are detected as dead - in such a case they are + * removed from the hash after a while. + */ + struct mh_swim_table_t *members; + /** + * This node. Is used to do not send messages to self, + * it's meaningless. Also to refute false gossips about + * self status. + */ + struct swim_member *self; + /** + * Members to which a message should be sent next during + * this round. + */ + struct rlist queue_round; + /** Generator of round step events. */ + struct ev_periodic round_tick; + /** + * Single round step task. It is impossible to have + * multiple round steps in the same SWIM instance at the + * same time, so it is single and preallocated per SWIM + * instance. + */ + struct swim_task round_step_task; + /** + * Scheduler of output requests, receiver of incomming + * ones. + */ + struct swim_scheduler scheduler; +}; + +/** + * A helper to get a pointer to a SWIM instance having only a + * pointer to it scheduler. It is used by task complete functions. + */ +static inline struct swim * +swim_by_scheduler(struct swim_scheduler *scheduler) +{ + return container_of(scheduler, struct swim, scheduler); +} + +/** + * Remove the member from all queues, hashes, destroy it and free + * the memory. + */ +static void +swim_member_delete(struct swim *swim, struct swim_member *member) +{ + say_verbose("SWIM: member %s is deleted", swim_uuid_str(&member->uuid)); + struct mh_swim_table_key key = {member->hash, &member->uuid}; + mh_int_t rc = mh_swim_table_find(swim->members, key, NULL); + assert(rc != mh_end(swim->members)); + mh_swim_table_del(swim->members, rc, NULL); + rlist_del_entry(member, in_queue_round); + + free(member); +} + +/** Find a member by UUID. */ +static inline struct swim_member * +swim_find_member(struct swim *swim, const struct tt_uuid *uuid) +{ + struct mh_swim_table_key key = {swim_uuid_hash(uuid), uuid}; + mh_int_t node = mh_swim_table_find(swim->members, key, NULL); + if (node == mh_end(swim->members)) + return NULL; + return *mh_swim_table_node(swim->members, node); +} + +/** + * Register a new member with a specified status. Here it is + * added to the hash, to the 'next' queue. + */ +static struct swim_member * +swim_member_new(struct swim *swim, const struct sockaddr_in *addr, + const struct tt_uuid *uuid, enum swim_member_status status) +{ + struct swim_member *member = + (struct swim_member *) calloc(1, sizeof(*member)); + if (member == NULL) { + diag_set(OutOfMemory, sizeof(*member), "calloc", "member"); + return NULL; + } + member->status = status; + member->addr = *addr; + member->uuid = *uuid; + member->hash = swim_uuid_hash(uuid); + mh_int_t rc = mh_swim_table_put(swim->members, + (const struct swim_member **) &member, + NULL, NULL); + if (rc == mh_end(swim->members)) { + free(member); + diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node"); + return NULL; + } + rlist_add_entry(&swim->queue_round, member, in_queue_round); + + say_verbose("SWIM: member %s is added", swim_uuid_str(uuid)); + return member; +} + +/** + * Take all the members from the table and shuffle them randomly. + * Is used for forthcoming round planning. + */ +static struct swim_member ** +swim_shuffle_members(struct swim *swim) +{ + struct mh_swim_table_t *members = swim->members; + struct swim_member **shuffled; + int bsize = sizeof(shuffled[0]) * mh_size(members); + shuffled = (struct swim_member **) malloc(bsize); + if (shuffled == NULL) { + diag_set(OutOfMemory, bsize, "malloc", "shuffled"); + return NULL; + } + int i = 0; + /* + * This shuffling preserves even distribution of a random + * sequence, that is proved by testing. + */ + for (mh_int_t node = mh_first(members), end = mh_end(members); + node != end; node = mh_next(members, node), ++i) { + shuffled[i] = *mh_swim_table_node(members, node); + int j = swim_scaled_rand(0, i); + SWAP(shuffled[i], shuffled[j]); + } + return shuffled; +} + +/** + * Shuffle members, build randomly ordered queue of addressees. In + * other words, do all round preparation work. + */ +static int +swim_new_round(struct swim *swim) +{ + int size = mh_size(swim->members); + say_verbose("SWIM: start a new round with %d members", size); + struct swim_member **shuffled = swim_shuffle_members(swim); + if (shuffled == NULL) + return -1; + rlist_create(&swim->queue_round); + for (int i = 0; i < size; ++i) { + if (shuffled[i] != swim->self) { + rlist_add_entry(&swim->queue_round, shuffled[i], + in_queue_round); + } + } + free(shuffled); + return 0; +} + +/** + * Encode anti-entropy header and random members data as many as + * possible to the end of the packet. + * @retval 0 Not error, but nothing is encoded. + * @retval 1 Something is encoded. + */ +static int +swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) +{ + struct swim_anti_entropy_header_bin ae_header_bin; + struct swim_member_bin member_bin; + int size = sizeof(ae_header_bin); + char *header = swim_packet_reserve(packet, size); + if (header == NULL) + return 0; + swim_member_bin_create(&member_bin); + struct mh_swim_table_t *t = swim->members; + int i = 0, member_count = mh_size(t); + int rnd = swim_scaled_rand(0, member_count - 1); + for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t); + i < member_count; ++i) { + struct swim_member *m = *mh_swim_table_node(t, rc); + int new_size = size + sizeof(member_bin); + char *pos = swim_packet_reserve(packet, new_size); + if (pos == NULL) + break; + size = new_size; + swim_member_bin_fill(&member_bin, &m->addr, &m->uuid, + m->status); + memcpy(pos, &member_bin, sizeof(member_bin)); + /* + * First random member could be choosen too close + * to the hash end. Here the cycle is wrapped, if + * a packet still has free memory, but the + * iterator has already reached the hash end. + */ + rc = mh_next(t, rc); + if (rc == end) + rc = mh_first(t); + } + if (i == 0) + return 0; + swim_packet_advance(packet, size); + swim_anti_entropy_header_bin_create(&ae_header_bin, i); + memcpy(header, &ae_header_bin, sizeof(ae_header_bin)); + return 1; +} + +/** + * Encode source UUID. + * @retval 0 Not error, but nothing is encoded. + * @retval 1 Something is encoded. + */ +static inline int +swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet) +{ + struct swim_src_uuid_bin uuid_bin; + char *pos = swim_packet_alloc(packet, sizeof(uuid_bin)); + if (pos == NULL) + return 0; + swim_src_uuid_bin_create(&uuid_bin, &swim->self->uuid); + memcpy(pos, &uuid_bin, sizeof(uuid_bin)); + return 1; +} + +/** Encode SWIM components into a UDP packet. */ +static void +swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) +{ + swim_packet_create(packet); + char *header = swim_packet_alloc(packet, 1); + int map_size = 0; + map_size += swim_encode_src_uuid(swim, packet); + map_size += swim_encode_anti_entropy(swim, packet); + + assert(mp_sizeof_map(map_size) == 1 && map_size == 2); + mp_encode_map(header, map_size); +} + +/** + * Once per specified timeout trigger a next round step. In round + * step a next memeber is taken from the round queue and a round + * message is sent to him. One member per step. + */ +static void +swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events) +{ + assert((events & EV_PERIODIC) != 0); + (void) events; + struct swim *swim = (struct swim *) p->data; + if (rlist_empty(&swim->queue_round) && swim_new_round(swim) != 0) { + diag_log(); + return; + } + /* + * Possibly empty, if no members but self are specified. + */ + if (rlist_empty(&swim->queue_round)) + return; + + swim_encode_round_msg(swim, &swim->round_step_task.packet); + struct swim_member *m = + rlist_first_entry(&swim->queue_round, struct swim_member, + in_queue_round); + swim_task_send(&swim->round_step_task, &m->addr, &swim->scheduler); + ev_periodic_stop(loop, p); +} + +/** + * After a round message is sent, the addressee can be popped from + * the queue, and the next step is scheduled. + */ +static void +swim_round_step_complete(struct swim_task *task, + struct swim_scheduler *scheduler, int rc) +{ + (void) rc; + (void) task; + struct swim *swim = swim_by_scheduler(scheduler); + ev_periodic_start(loop(), &swim->round_tick); + rlist_shift_entry(&swim->queue_round, struct swim_member, + in_queue_round); +} + +/** + * Update member's UUID if it is changed. On UUID change the + * member is reinserted into the members table with a new UUID. + * @retval 0 Success. + * @retval -1 Error. Out of memory or the new UUID is already in + * use. + */ +static int +swim_member_update_uuid(struct swim_member *member, + const struct tt_uuid *new_uuid, struct swim *swim) +{ + if (tt_uuid_is_equal(new_uuid, &member->uuid)) + return 0; + if (swim_find_member(swim, new_uuid) != NULL) { + diag_set(SwimError, "duplicate UUID '%s'", + swim_uuid_str(new_uuid)); + return -1; + } + struct mh_swim_table_t *t = swim->members; + struct tt_uuid old_uuid = member->uuid; + member->uuid = *new_uuid; + if (mh_swim_table_put(t, (const struct swim_member **) &member, NULL, + NULL) == mh_end(t)) { + member->uuid = old_uuid; + diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node"); + return -1; + } + struct mh_swim_table_key key = {member->hash, &old_uuid}; + mh_swim_table_del(t, mh_swim_table_find(t, key, NULL), NULL); + member->hash = swim_uuid_hash(new_uuid); + return 0; +} + +/** Update member's address.*/ +static inline void +swim_member_update_addr(struct swim_member *member, + const struct sockaddr_in *addr) +{ + member->addr = *addr; +} + +/** + * Update or create a member by its definition, received from a + * remote instance. + * @retval NULL Error. + * @retval New member, or updated old member. + */ +static struct swim_member * +swim_update_member(struct swim *swim, const struct swim_member_def *def) +{ + struct swim_member *member = swim_find_member(swim, &def->uuid); + if (member == NULL) { + member = swim_member_new(swim, &def->addr, &def->uuid, + def->status); + return member; + } + swim_member_update_addr(member, &def->addr); + return member; +} + +/** Decode an anti-entropy message, update members table. */ +static int +swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) +{ + const char *msg_pref = "invalid anti-entropy message:"; + uint32_t size; + if (swim_decode_array(pos, end, &size, msg_pref, "root") != 0) + return -1; + for (uint64_t i = 0; i < size; ++i) { + struct swim_member_def def; + if (swim_member_def_decode(&def, pos, end, msg_pref) != 0) + return -1; + if (swim_update_member(swim, &def) == NULL) { + /* + * Not a critical error. Other members + * still can be updated. + */ + diag_log(); + } + } + return 0; +} + +/** Process a new message. */ +static void +swim_on_input(struct swim_scheduler *scheduler, const char *pos, + const char *end, const struct sockaddr_in *src) +{ + (void) src; + const char *msg_pref = "invalid message:"; + struct swim *swim = swim_by_scheduler(scheduler); + struct tt_uuid uuid; + uint32_t size; + if (swim_decode_map(&pos, end, &size, msg_pref, "root") != 0) + goto error; + if (size == 0) { + diag_set(SwimError, "%s body can not be empty", msg_pref); + goto error; + } + uint64_t key; + if (swim_decode_uint(&pos, end, &key, msg_pref, "a key") != 0) + goto error; + if (key != SWIM_SRC_UUID) { + diag_set(SwimError, "%s first key should be source UUID", + msg_pref); + goto error; + } + if (swim_decode_uuid(&uuid, &pos, end, msg_pref, "source uuid") != 0) + goto error; + --size; + for (uint32_t i = 0; i < size; ++i) { + if (swim_decode_uint(&pos, end, &key, msg_pref, "a key") != 0) + goto error; + switch(key) { + case SWIM_ANTI_ENTROPY: + say_verbose("SWIM: process anti-entropy"); + if (swim_process_anti_entropy(swim, &pos, end) != 0) + goto error; + break; + default: + diag_set(SwimError, "%s unexpected key", msg_pref); + goto error; + } + } + return; +error: + diag_log(); +} + +struct swim * +swim_new(void) +{ + struct swim *swim = (struct swim *) calloc(1, sizeof(*swim)); + if (swim == NULL) { + diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim"); + return NULL; + } + swim->members = mh_swim_table_new(); + if (swim->members == NULL) { + free(swim); + diag_set(OutOfMemory, sizeof(*swim->members), + "mh_swim_table_new", "members"); + return NULL; + } + rlist_create(&swim->queue_round); + ev_init(&swim->round_tick, swim_round_step_begin); + ev_periodic_set(&swim->round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL); + swim->round_tick.data = (void *) swim; + swim_task_create(&swim->round_step_task, swim_round_step_complete, + NULL); + swim_scheduler_create(&swim->scheduler, swim_on_input); + return swim; +} + +/** + * Parse URI, filter out everything but IP addresses and ports, + * and fill a struct sockaddr_in. + */ +static inline int +swim_uri_to_addr(const char *uri, struct sockaddr_in *addr, + const char *msg_pref) +{ + struct sockaddr_storage storage; + if (sio_uri_to_addr(uri, (struct sockaddr *) &storage) != 0) + return -1; + if (storage.ss_family != AF_INET) { + diag_set(IllegalParams, "%s only IP sockets are supported", + msg_pref); + return -1; + } + *addr = *((struct sockaddr_in *) &storage); + return 0; +} + +int +swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, + const struct tt_uuid *uuid) +{ + const char *msg_pref = "swim.cfg:"; + if (heartbeat_rate < 0) { + diag_set(IllegalParams, "%s heartbeat_rate should be a "\ + "positive number", msg_pref); + return -1; + } + struct sockaddr_in addr; + if (uri != NULL && swim_uri_to_addr(uri, &addr, msg_pref) != 0) + return -1; + bool is_first_cfg = swim->self == NULL; + if (is_first_cfg) { + if (uuid == NULL || tt_uuid_is_nil(uuid) || uri == NULL) { + diag_set(SwimError, "%s UUID and URI are mandatory in "\ + "a first config", msg_pref); + return -1; + } + swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE); + if (swim->self == NULL) + return -1; + } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { + uuid = &swim->self->uuid; + } else if (! tt_uuid_is_equal(uuid, &swim->self->uuid)) { + if (swim_find_member(swim, uuid) != NULL) { + diag_set(SwimError, "%s a member with such UUID "\ + "already exists", msg_pref); + return -1; + } + /* + * Reserve one cell for reinsertion of self with a + * new UUID. Reserve is necessary for atomic + * reconfiguration. Without reservation it is + * possible that the instance is bound to a new + * URI, but failed to update UUID due to memory + * issues. + */ + if (mh_swim_table_reserve(swim->members, + mh_size(swim->members) + 1, + NULL) != 0) { + diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", + "node"); + return -1; + } + + } + if (uri != NULL && swim_scheduler_bind(&swim->scheduler, &addr) != 0) { + if (is_first_cfg) { + swim_member_delete(swim, swim->self); + swim->self = NULL; + } + return -1; + } + if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0) + ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL); + + ev_periodic_start(loop(), &swim->round_tick); + + if (! is_first_cfg) { + swim_member_update_addr(swim->self, &addr); + int rc = swim_member_update_uuid(swim->self, uuid, swim); + /* Reserved above. */ + assert(rc == 0); + (void) rc; + } + return 0; +} + +/** + * Check if a SWIM instance is not configured, and if so - set an + * error in a diagnostics area. + */ +static inline int +swim_check_is_configured(const struct swim *swim, const char *msg_pref) +{ + if (swim->self != NULL) + return 0; + diag_set(SwimError, "%s the instance is not configured", msg_pref); + return -1; +} + +int +swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) +{ + const char *msg_pref = "swim.add_member:"; + if (swim_check_is_configured(swim, msg_pref) != 0) + return -1; + struct sockaddr_in addr; + if (swim_uri_to_addr(uri, &addr, msg_pref) != 0) + return -1; + struct swim_member *member = swim_find_member(swim, uuid); + if (member == NULL) { + member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE); + return member == NULL ? -1 : 0; + } + diag_set(SwimError, "%s a member with such UUID already exists", + msg_pref); + return 1; +} + +int +swim_remove_member(struct swim *swim, const struct tt_uuid *uuid) +{ + const char *msg_pref = "swim.remove_member:"; + if (swim_check_is_configured(swim, msg_pref) != 0) + return -1; + struct swim_member *member = swim_find_member(swim, uuid); + if (member == NULL) + return 0; + if (member == swim->self) { + diag_set(SwimError, "%s can not remove self", msg_pref); + return -1; + } + swim_member_delete(swim, member); + return 0; +} + +void +swim_info(struct swim *swim, struct info_handler *info) +{ + info_begin(info); + for (mh_int_t node = mh_first(swim->members), + end = mh_end(swim->members); node != end; + node = mh_next(swim->members, node)) { + struct swim_member *m = + *mh_swim_table_node(swim->members, node); + info_table_begin(info, + sio_strfaddr((struct sockaddr *) &m->addr, + sizeof(m->addr))); + info_append_str(info, "status", + swim_member_status_strs[m->status]); + info_append_str(info, "uuid", swim_uuid_str(&m->uuid)); + info_table_end(info); + } + info_end(info); +} + +void +swim_delete(struct swim *swim) +{ + swim_scheduler_destroy(&swim->scheduler); + ev_periodic_stop(loop(), &swim->round_tick); + swim_task_destroy(&swim->round_step_task); + mh_int_t node = mh_first(swim->members); + while (node != mh_end(swim->members)) { + struct swim_member *m = + *mh_swim_table_node(swim->members, node); + swim_member_delete(swim, m); + node = mh_first(swim->members); + } + mh_swim_table_delete(swim->members); +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h new file mode 100644 index 000000000..a98decc86 --- /dev/null +++ b/src/lib/swim/swim.h @@ -0,0 +1,91 @@ +#ifndef TARANTOOL_SWIM_H_INCLUDED +#define TARANTOOL_SWIM_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#if defined(__cplusplus) +extern "C" { +#endif + +struct info_handler; +struct swim; +struct tt_uuid; + +/** + * Create a new SWIM instance. Just creation without binding, + * setting any parameters. Allocation and initialization only. + */ +struct swim * +swim_new(void); + +/** + * Configure or reconfigure a SWIM instance. + * + * @param swim SWIM instance to configure. + * @param uri URI in the format "ip:port". + * @param heartbeat_rate Rate of sending round messages. It does + * not mean that each member will be checked each + * @heartbeat_rate seconds. It is rather the protocol + * speed. Protocol period depends on member count and + * @heartbeat_rate. + * @param uuid UUID of this instance. Must be unique over the + * cluster. + * + * @retval 0 Success. + * @retval -1 Error. + */ +int +swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, + const struct tt_uuid *uuid); + +/** + * Stop listening and broadcasting messages, cleanup all internal + * structures, free memory. + */ +void +swim_delete(struct swim *swim); + +/** Add a new member. */ +int +swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid); + +/** Silently remove a member from members table. */ +int +swim_remove_member(struct swim *swim, const struct tt_uuid *uuid); + +/** Dump member statuses into @a info. */ +void +swim_info(struct swim *swim, struct info_handler *info); + +#if defined(__cplusplus) +} +#endif + +#endif /* TARANTOOL_SWIM_H_INCLUDED */ diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c new file mode 100644 index 000000000..20973acaf --- /dev/null +++ b/src/lib/swim/swim_io.c @@ -0,0 +1,204 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "swim_io.h" +#include "swim_proto.h" +#include "fiber.h" +#include "sio.h" + +/** + * Allocate memory for meta. The same as mere alloc, but moves + * body pointer. + */ +static inline void +swim_packet_alloc_meta(struct swim_packet *packet, int size) +{ + char *tmp = swim_packet_alloc(packet, size); + assert(tmp != NULL); + (void) tmp; + packet->body = packet->pos; +} + +void +swim_packet_create(struct swim_packet *packet) +{ + packet->pos = packet->body; + packet->body = packet->buf; + swim_packet_alloc_meta(packet, sizeof(struct swim_meta_header_bin)); +} + +void +swim_task_create(struct swim_task *task, swim_task_f complete, + swim_task_f cancel) +{ + memset(task, 0, sizeof(*task)); + task->complete = complete; + task->cancel = cancel; + swim_packet_create(&task->packet); + rlist_create(&task->in_queue_output); +} + +/** Put the task into the queue of output tasks. */ +static inline void +swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler) +{ + assert(rlist_empty(&task->in_queue_output)); + rlist_add_tail_entry(&scheduler->queue_output, task, in_queue_output); + ev_io_start(loop(), &scheduler->output); +} + +void +swim_task_send(struct swim_task *task, const struct sockaddr_in *dst, + struct swim_scheduler *scheduler) +{ + task->dst = *dst; + swim_task_schedule(task, scheduler); +} + +/** + * Dispatch a next output event. Build packet meta and send the + * packet. + */ +static void +swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events); + +/** + * Dispatch a next input event. Unpack meta, forward a packet or + * propagate further to protocol logic. + */ +static void +swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events); + +void +swim_scheduler_create(struct swim_scheduler *scheduler, + swim_scheduler_on_input_f on_input) +{ + ev_init(&scheduler->output, swim_scheduler_on_output); + scheduler->output.data = (void *) scheduler; + ev_init(&scheduler->input, swim_scheduler_on_input); + scheduler->input.data = (void *) scheduler; + rlist_create(&scheduler->queue_output); + scheduler->on_input = on_input; + swim_transport_create(&scheduler->transport); +} + +int +swim_scheduler_bind(struct swim_scheduler *scheduler, + const struct sockaddr_in *addr) +{ + struct swim_transport *t = &scheduler->transport; + if (swim_transport_bind(t, (const struct sockaddr *) addr, + sizeof(*addr)) != 0) + return -1; + ev_io_set(&scheduler->input, t->fd, EV_READ); + ev_io_set(&scheduler->output, t->fd, EV_WRITE); + return 0; +} + +void +swim_scheduler_destroy(struct swim_scheduler *scheduler) +{ + struct swim_task *t, *tmp; + /* + * Use 'safe', because cancelation can delete the task + * from the queue, or even delete the task itself. + */ + rlist_foreach_entry_safe(t, &scheduler->queue_output, in_queue_output, + tmp) { + if (t->cancel != NULL) + t->cancel(t, scheduler, -1); + } + swim_transport_destroy(&scheduler->transport); + ev_io_stop(loop(), &scheduler->output); + ev_io_stop(loop(), &scheduler->input); +} + +static void +swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) +{ + assert((events & EV_WRITE) != 0); + (void) events; + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; + if (rlist_empty(&scheduler->queue_output)) { + /* + * Possible, if a member pushed a task and then + * was deleted together with it. + */ + ev_io_stop(loop, io); + return; + } + struct swim_task *task = + rlist_shift_entry(&scheduler->queue_output, struct swim_task, + in_queue_output); + say_verbose("SWIM: send to %s", + sio_strfaddr((struct sockaddr *) &task->dst, + sizeof(task->dst))); + struct swim_meta_header_bin header; + swim_meta_header_bin_create(&header, &scheduler->transport.addr); + memcpy(task->packet.meta, &header, sizeof(header)); + int rc = swim_transport_send(&scheduler->transport, task->packet.buf, + task->packet.pos - task->packet.buf, + (const struct sockaddr *) &task->dst, + sizeof(task->dst)); + if (rc != 0) + diag_log(); + if (task->complete != NULL) + task->complete(task, scheduler, rc); +} + +static void +swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) +{ + assert((events & EV_READ) != 0); + (void) events; + (void) loop; + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; + struct sockaddr_in src; + socklen_t len = sizeof(src); + char buf[UDP_PACKET_SIZE]; + ssize_t size = swim_transport_recv(&scheduler->transport, buf, + sizeof(buf), + (struct sockaddr *) &src, &len); + if (size <= 0) { + if (size < 0) + goto error; + return; + } + say_verbose("SWIM: received from %s", + sio_strfaddr((struct sockaddr *) &src, len)); + struct swim_meta_def meta; + const char *pos = buf, *end = pos + size; + if (swim_meta_def_decode(&meta, &pos, end) < 0) + goto error; + scheduler->on_input(scheduler, pos, end, &meta.src); + return; +error: + diag_log(); +} diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h new file mode 100644 index 000000000..68fb89818 --- /dev/null +++ b/src/lib/swim/swim_io.h @@ -0,0 +1,225 @@ +#ifndef TARANTOOL_SWIM_IO_H_INCLUDED +#define TARANTOOL_SWIM_IO_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "trivia/util.h" +#include "small/rlist.h" +#include "salad/stailq.h" +#include "swim_transport.h" +#include "tarantool_ev.h" +#include <stdbool.h> +#include <arpa/inet.h> + +/** + * SWIM protocol transport level. + */ + +struct swim_task; +struct swim_scheduler; + +enum { + /** + * Default MTU is 1500. MTU (when IPv4 is used) consists + * of IPv4 header, UDP header, Data. IPv4 has 20 bytes + * header, UDP - 8 bytes. So Data = 1500 - 20 - 8 = 1472. + * TODO: adapt to other MTUs which can be reduced in some + * networks by their admins. Or allow to specify MTU in + * configuration. + */ + UDP_PACKET_SIZE = 1472, +}; + +/** + * UDP packet. Works as an allocator, allowing to fill its body + * gradually, while preserving prefix for metadata. + * + * < - - - -UDP_PACKET_SIZE- - - - -> + * +--------+-----------------------+ + * | meta | body | *free* | + * +--------+-----------------------+ + * ^ ^ ^ ^ + * meta body pos end + * buf + */ +struct swim_packet { + /** End of the body. */ + char *pos; + /** + * Starting position of body in the buffer. Not the same + * as buf, because the latter has metadata at the + * beginning. + */ + char *body; + /** + * Alias for swim_packet.buf. Just sugar for code working + * with meta. + */ + char meta[0]; + /** Packet body buffer. */ + char buf[UDP_PACKET_SIZE]; + /** + * Pointer to the end of the buffer. Just sugar to do not + * write 'buf + sizeof(buf)' each time. + */ + char end[0]; +}; + +/** + * Ensure that the packet can fit @a size bytes more. Multiple + * reserves of the same size will return the same pointer until + * advance is called. + */ +static inline char * +swim_packet_reserve(struct swim_packet *packet, int size) +{ + return packet->pos + size > packet->end ? NULL : packet->pos; +} + +/** + * Propagate body end pointer. This declares next @a size bytes as + * occupied. + */ +static inline void +swim_packet_advance(struct swim_packet *packet, int size) +{ + assert(packet->pos + size <= packet->end); + packet->pos += size; +} + +/** Reserve + advance. */ +static inline char * +swim_packet_alloc(struct swim_packet *packet, int size) +{ + char *res = swim_packet_reserve(packet, size); + if (res == NULL) + return NULL; + swim_packet_advance(packet, size); + return res; +} + +/** Initialize @a packet, reserve some space for meta. */ +void +swim_packet_create(struct swim_packet *packet); + +typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler, + const char *buf, const char *end, + const struct sockaddr_in *src); + +/** Planner and executor of input and output operations.*/ +struct swim_scheduler { + /** Transport to send/receive packets. */ + struct swim_transport transport; + /** + * Function called when a packet is received. It takes + * packet body, while meta is handled by transport level + * completely. + */ + swim_scheduler_on_input_f on_input; + /** + * Event dispatcher of incomming messages. Takes them from + * the network. + */ + struct ev_io input; + /** + * Event dispatcher of outcomming messages. Takes tasks + * from queue_output. + */ + struct ev_io output; + /** Queue of output tasks ready to write now. */ + struct rlist queue_output; +}; + +/** Initialize scheduler. */ +void +swim_scheduler_create(struct swim_scheduler *scheduler, + swim_scheduler_on_input_f on_input); + +/** + * Bind or rebind the scheduler to an address. In case of rebind + * the old socket is closed. + */ +int +swim_scheduler_bind(struct swim_scheduler *scheduler, + const struct sockaddr_in *addr); + +/** Destroy scheduler, its queues, close the socket. */ +void +swim_scheduler_destroy(struct swim_scheduler *scheduler); + +/** + * Each SWIM component in a common case independently may want to + * push some data into the network. Dissemination sends events, + * failure detection sends pings, acks. Anti-entropy sends member + * tables. The intention to send a data is called IO task and is + * stored in a queue that is dispatched when output is possible. + */ +typedef void (*swim_task_f)(struct swim_task *, + struct swim_scheduler *scheduler, int rc); + +struct swim_task { + /** + * Function called when the task has completed. Error code + * or 0 are passed as an argument. + */ + swim_task_f complete; + /** + * Function, called when a scheduler is under destruction, + * and it cancels all its tasks. + */ + swim_task_f cancel; + /** Packet to send. */ + struct swim_packet packet; + /** Destination address. */ + struct sockaddr_in dst; + /** Place in a queue of tasks. */ + struct rlist in_queue_output; +}; + +/** + * Put the task into a queue of tasks. Eventually it will be sent. + */ +void +swim_task_send(struct swim_task *task, const struct sockaddr_in *dst, + struct swim_scheduler *scheduler); + +/** Initialize the task, without scheduling. */ +void +swim_task_create(struct swim_task *task, swim_task_f complete, + swim_task_f cancel); + +/** Destroy the task, pop from the queue. */ +static inline void +swim_task_destroy(struct swim_task *task) +{ + rlist_del_entry(task, in_queue_output); +} + +#endif /* TARANTOOL_SWIM_IO_H_INCLUDED */ \ No newline at end of file diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c new file mode 100644 index 000000000..a273cb815 --- /dev/null +++ b/src/lib/swim/swim_proto.c @@ -0,0 +1,327 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "swim_proto.h" +#include "msgpuck.h" +#include "say.h" +#include "version.h" +#include "diag.h" + +const char *swim_member_status_strs[] = { + "alive", +}; + +int +swim_decode_map(const char **pos, const char *end, uint32_t *size, + const char *msg_pref, const char *param_name) +{ + if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) { + diag_set(SwimError, "%s %s should be a map", msg_pref, + param_name); + return -1; + } + *size = mp_decode_map(pos); + return 0; +} + +int +swim_decode_array(const char **pos, const char *end, uint32_t *size, + const char *msg_pref, const char *param_name) +{ + if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) { + diag_set(SwimError, "%s %s should be an array", msg_pref, + param_name); + return -1; + } + *size = mp_decode_array(pos); + return 0; +} + +int +swim_decode_uint(const char **pos, const char *end, uint64_t *value, + const char *msg_pref, const char *param_name) +{ + if (mp_typeof(**pos) != MP_UINT || mp_check_uint(*pos, end) > 0) { + diag_set(SwimError, "%s %s should be a uint", msg_pref, + param_name); + return -1; + } + *value = mp_decode_uint(pos); + return 0; +} + +static inline int +swim_decode_ip(struct sockaddr_in *address, const char **pos, const char *end, + const char *msg_pref, const char *param_name) +{ + uint64_t ip; + if (swim_decode_uint(pos, end, &ip, msg_pref, param_name) != 0) + return -1; + if (ip > UINT32_MAX) { + diag_set(SwimError, "%s %s is an invalid IP address", msg_pref, + param_name); + return -1; + } + address->sin_addr.s_addr = ip; + return 0; +} + +static inline int +swim_decode_port(struct sockaddr_in *address, const char **pos, const char *end, + const char *msg_pref, const char *param_name) +{ + uint64_t port; + if (swim_decode_uint(pos, end, &port, msg_pref, param_name) != 0) + return -1; + if (port > UINT16_MAX) { + diag_set(SwimError, "%s %s is an invalid port", msg_pref, + param_name); + return -1; + } + address->sin_port = port; + return 0; +} + +int +swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, + const char *msg_pref, const char *param_name) +{ + if (mp_typeof(**pos) != MP_BIN || mp_check_binl(*pos, end) > 0) { + diag_set(SwimError, "%s %s should be bin", msg_pref, + param_name); + return -1; + } + if (mp_decode_binl(pos) != UUID_LEN || *pos + UUID_LEN > end) { + diag_set(SwimError, "%s %s is invalid", msg_pref, param_name); + return -1; + } + memcpy(uuid, *pos, UUID_LEN); + *pos += UUID_LEN; + return 0; +} + +void +swim_member_def_create(struct swim_member_def *def) +{ + memset(def, 0, sizeof(*def)); + def->status = MEMBER_ALIVE; +} + +/** + * Decode a MessagePack value of @a key and store it in @a def. + * @param key Key to read value of. + * @param[in][out] pos Where a value is stored. + * @param end End of the buffer. + * @param msg_pref Error message prefix. + * @param[out] def Where to store the value. + * + * @retval 0 Success. + * @retval -1 Error. + */ +static int +swim_decode_member_key(enum swim_member_key key, const char **pos, + const char *end, const char *msg_pref, + struct swim_member_def *def) +{ + uint64_t tmp; + switch (key) { + case SWIM_MEMBER_STATUS: + if (swim_decode_uint(pos, end, &tmp, msg_pref, + "member status") != 0) + return -1; + if (tmp >= swim_member_status_MAX) { + diag_set(SwimError, "%s unknown member status", + msg_pref); + return -1; + } + def->status = (enum swim_member_status) tmp; + break; + case SWIM_MEMBER_ADDRESS: + if (swim_decode_ip(&def->addr, pos, end, msg_pref, + "member address") != 0) + return -1; + break; + case SWIM_MEMBER_PORT: + if (swim_decode_port(&def->addr, pos, end, msg_pref, + "member port") != 0) + return -1; + break; + case SWIM_MEMBER_UUID: + if (swim_decode_uuid(&def->uuid, pos, end, msg_pref, + "member uuid") != 0) + return -1; + break; + default: + unreachable(); + } + return 0; +} + +int +swim_member_def_decode(struct swim_member_def *def, const char **pos, + const char *end, const char *msg_pref) +{ + uint32_t size; + if (swim_decode_map(pos, end, &size, msg_pref, "member") != 0) + return -1; + swim_member_def_create(def); + for (uint32_t j = 0; j < size; ++j) { + uint64_t key; + if (swim_decode_uint(pos, end, &key, msg_pref, + "member key") != 0) + return -1; + if (key >= swim_member_key_MAX) { + diag_set(SwimError, "%s unknown member key", msg_pref); + return -1; + } + if (swim_decode_member_key(key, pos, end, msg_pref, def) != 0) + return -1; + } + if (def->addr.sin_port == 0 || def->addr.sin_addr.s_addr == 0) { + diag_set(SwimError, "%s member address is mandatory", msg_pref); + return -1; + } + if (tt_uuid_is_nil(&def->uuid)) { + diag_set(SwimError, "%s member uuid is mandatory", msg_pref); + return -1; + } + return 0; +} + +void +swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, + const struct tt_uuid *uuid) +{ + header->k_uuid = SWIM_SRC_UUID; + header->m_uuid = 0xc4; + header->m_uuid_len = UUID_LEN; + memcpy(header->v_uuid, uuid, UUID_LEN); +} + +void +swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, + uint16_t batch_size) +{ + header->k_anti_entropy = SWIM_ANTI_ENTROPY; + header->m_anti_entropy = 0xcd; + header->v_anti_entropy = mp_bswap_u16(batch_size); +} + +void +swim_member_bin_fill(struct swim_member_bin *header, + const struct sockaddr_in *addr, const struct tt_uuid *uuid, + enum swim_member_status status) +{ + header->v_status = status; + header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr); + header->v_port = mp_bswap_u16(addr->sin_port); + memcpy(header->v_uuid, uuid, UUID_LEN); +} + +void +swim_member_bin_create(struct swim_member_bin *header) +{ + header->m_header = 0x84; + header->k_status = SWIM_MEMBER_STATUS; + header->k_addr = SWIM_MEMBER_ADDRESS; + header->m_addr = 0xce; + header->k_port = SWIM_MEMBER_PORT; + header->m_port = 0xcd; + header->k_uuid = SWIM_MEMBER_UUID; + header->m_uuid = 0xc4; + header->m_uuid_len = UUID_LEN; +} + +void +swim_meta_header_bin_create(struct swim_meta_header_bin *header, + const struct sockaddr_in *src) +{ + header->m_header = 0x83; + header->k_version = SWIM_META_TARANTOOL_VERSION; + header->m_version = 0xce; + header->v_version = mp_bswap_u32(tarantool_version_id()); + header->k_addr = SWIM_META_SRC_ADDRESS; + header->m_addr = 0xce; + header->v_addr = mp_bswap_u32(src->sin_addr.s_addr); + header->k_port = SWIM_META_SRC_PORT; + header->m_port = 0xcd; + header->v_port = mp_bswap_u16(src->sin_port); +} + +int +swim_meta_def_decode(struct swim_meta_def *def, const char **pos, + const char *end) +{ + const char *msg_pref = "invalid meta section:"; + uint32_t size; + if (swim_decode_map(pos, end, &size, msg_pref, "root") != 0) + return -1; + memset(def, 0, sizeof(*def)); + for (uint32_t i = 0; i < size; ++i) { + uint64_t key; + if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0) + return -1; + switch (key) { + case SWIM_META_TARANTOOL_VERSION: + if (swim_decode_uint(pos, end, &key, msg_pref, + "version") != 0) + return -1; + if (key > UINT32_MAX) { + diag_set(SwimError, "%s invalid version, too "\ + "big", msg_pref); + return -1; + } + def->version = key; + break; + case SWIM_META_SRC_ADDRESS: + if (swim_decode_ip(&def->src, pos, end, msg_pref, + "source address") != 0) + return -1; + break; + case SWIM_META_SRC_PORT: + if (swim_decode_port(&def->src, pos, end, msg_pref, + "source port") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unknown key", msg_pref); + return -1; + } + } + if (def->version == 0) { + diag_set(SwimError, "%s version is mandatory", msg_pref); + return -1; + } + if (def->src.sin_port == 0 || def->src.sin_addr.s_addr == 0) { + diag_set(SwimError, "%s source address is mandatory", msg_pref); + return -1; + } + return 0; +} diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h new file mode 100644 index 000000000..6e36f0b07 --- /dev/null +++ b/src/lib/swim/swim_proto.h @@ -0,0 +1,320 @@ +#ifndef TARANTOOL_SWIM_PROTO_H_INCLUDED +#define TARANTOOL_SWIM_PROTO_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "trivia/util.h" +#include "tt_uuid.h" +#include <arpa/inet.h> +#include <stdbool.h> + +/** + * SWIM binary protocol structures and helpers. Below is a picture + * of a SWIM message template: + * + * +----------Meta section, handled by transport level-----------+ + * | { | + * | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,| + * | SWIM_META_SRC_ADDRESS: uint, ip, | + * | SWIM_META_SRC_PORT: uint, port | + * | } | + * +-------------------Protocol logic section--------------------+ + * | { | + * | SWIM_SRC_UUID: 16 byte UUID, | + * | | + * | AND | + * | | + * | SWIM_ANTI_ENTROPY: [ | + * | { | + * | SWIM_MEMBER_STATUS: uint, enum member_status, | + * | SWIM_MEMBER_ADDRESS: uint, ip, | + * | SWIM_MEMBER_PORT: uint, port, | + * | SWIM_MEMBER_UUID: 16 byte UUID | + * | }, | + * | ... | + * | ], | + * | } | + * +-------------------------------------------------------------+ + */ + +enum swim_member_status { + /** The instance is ok, responds to requests. */ + MEMBER_ALIVE = 0, + swim_member_status_MAX, +}; + +extern const char *swim_member_status_strs[]; + +/** + * SWIM member attributes from anti-entropy and dissemination + * messages. + */ +struct swim_member_def { + struct tt_uuid uuid; + struct sockaddr_in addr; + enum swim_member_status status; +}; + +/** Initialize the definition with default values. */ +void +swim_member_def_create(struct swim_member_def *def); + +/** + * Decode member definition from a MessagePack buffer. + * @param[out] def Definition to decode into. + * @param[in][out] pos Start of the MessagePack buffer. + * @param end End of the MessagePack buffer. + * @param msg_pref A prefix of an error message to use for + * diag_set, when something is wrong. + * + * @retval 0 Success. + * @retval -1 Error. + */ +int +swim_member_def_decode(struct swim_member_def *def, const char **pos, + const char *end, const char *msg_pref); + +/** + * Main round messages can carry merged failure detection + * messages and anti-entropy. With these keys the components can + * be distinguished from each other. + */ +enum swim_body_key { + SWIM_SRC_UUID = 0, + SWIM_ANTI_ENTROPY, +}; + +/** + * One of SWIM packet body components - SWIM_SRC_UUID. It is not + * in the meta section, handled by the transport, because the + * transport has nothing to do with UUIDs - it operates by IP/port + * only. This component shall be first in message's body. + */ +struct PACKED swim_src_uuid_bin { + /** mp_encode_uint(SWIM_SRC_UUID) */ + uint8_t k_uuid; + /** mp_encode_bin(UUID_LEN) */ + uint8_t m_uuid; + uint8_t m_uuid_len; + uint8_t v_uuid[UUID_LEN]; +}; + +/** Initialize source UUID section. */ +void +swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, + const struct tt_uuid *uuid); + +/** {{{ Anti-entropy component */ + +/** + * Attributes of each record of a broadcasted members table. Just + * the same as some of struct swim_member attributes. + */ +enum swim_member_key { + SWIM_MEMBER_STATUS = 0, + SWIM_MEMBER_ADDRESS, + SWIM_MEMBER_PORT, + SWIM_MEMBER_UUID, + swim_member_key_MAX, +}; + +/** SWIM anti-entropy MessagePack header template. */ +struct PACKED swim_anti_entropy_header_bin { + /** mp_encode_uint(SWIM_ANTI_ENTROPY) */ + uint8_t k_anti_entropy; + /** mp_encode_array(...) */ + uint8_t m_anti_entropy; + uint16_t v_anti_entropy; +}; + +/** Initialize SWIM_ANTI_ENTROPY header. */ +void +swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, + uint16_t batch_size); + +/** + * SWIM member MessagePack template. Represents one record in + * anti-entropy section. + */ +struct PACKED swim_member_bin { + /** mp_encode_map(4) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_MEMBER_STATUS) */ + uint8_t k_status; + /** mp_encode_uint(enum member_status) */ + uint8_t v_status; + + /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */ + uint8_t k_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_addr; + uint32_t v_addr; + + /** mp_encode_uint(SWIM_MEMBER_PORT) */ + uint8_t k_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_port; + uint16_t v_port; + + /** mp_encode_uint(SWIM_MEMBER_UUID) */ + uint8_t k_uuid; + /** mp_encode_bin(UUID_LEN) */ + uint8_t m_uuid; + uint8_t m_uuid_len; + uint8_t v_uuid[UUID_LEN]; +}; + +/** Initialize antri-entropy record. */ +void +swim_member_bin_create(struct swim_member_bin *header); + +/** + * Since usually there are many members, it is faster to reset a + * few fields in an existing template, then each time create a + * new template. So the usage pattern is create(), fill(), + * fill() ... . + */ +void +swim_member_bin_fill(struct swim_member_bin *header, + const struct sockaddr_in *addr, const struct tt_uuid *uuid, + enum swim_member_status status); + +/** }}} Anti-entropy component */ + +/** {{{ Meta component */ + +/** + * Meta component keys, completely handled by the transport level. + */ +enum swim_meta_key { + /** + * Version is now unused, but in future can help in + * the protocol improvement, extension. + */ + SWIM_META_TARANTOOL_VERSION = 0, + /** + * Source IP/port are stored in body of UDP packet despite + * the fact that UDP has them in its header. This is + * because + * - packet body is going to be encrypted, but header + * is still open and anybody can catch the packet, + * change source IP/port, and therefore execute + * man-in-the-middle attack; + * + * - some network filters can change the address to an + * address of a router or another device. + */ + SWIM_META_SRC_ADDRESS, + SWIM_META_SRC_PORT, +}; + +/** + * Each SWIM packet carries meta info, which helps to determine + * SWIM protocol version, final packet destination and any other + * internal details, not linked with etalon SWIM protocol. + * + * The meta header is mandatory, preceeds main protocol data as a + * separate MessagePack map. + */ +struct PACKED swim_meta_header_bin { + /** mp_encode_map(3) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */ + uint8_t k_version; + /** mp_encode_uint(tarantool_version_id()) */ + uint8_t m_version; + uint32_t v_version; + + /** mp_encode_uint(SWIM_META_SRC_ADDRESS) */ + uint8_t k_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_addr; + uint32_t v_addr; + + /** mp_encode_uint(SWIM_META_SRC_PORT) */ + uint8_t k_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_port; + uint16_t v_port; +}; + +/** Initialize meta section. */ +void +swim_meta_header_bin_create(struct swim_meta_header_bin *header, + const struct sockaddr_in *src); + +/** Meta definition. */ +struct swim_meta_def { + /** Tarantool version. */ + uint32_t version; + /** Source of the message. */ + struct sockaddr_in src; +}; + +/** + * Decode meta section into its definition object. + * @param[out] def Definition to decode into. + * @param[in][out] pos MessagePack buffer to decode. + * @param end End of the MessagePack buffer. + * + * @retval 0 Success. + * @retval -1 Error. + */ +int +swim_meta_def_decode(struct swim_meta_def *def, const char **pos, + const char *end); + +/** }}} Meta component */ + +/** + * Helpers to decode some values - map, array, etc with + * appropriate checks. All of them set diagnostics on an error + * with a specified message prefix and a parameter name. + */ + +int +swim_decode_map(const char **pos, const char *end, uint32_t *size, + const char *msg_pref, const char *param_name); + +int +swim_decode_array(const char **pos, const char *end, uint32_t *size, + const char *msg_pref, const char *param_name); + +int +swim_decode_uint(const char **pos, const char *end, uint64_t *value, + const char *msg_pref, const char *param_name); + +int +swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, + const char *msg_pref, const char *param_name); + +#endif /* TARANTOOL_SWIM_PROTO_H_INCLUDED */ diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h new file mode 100644 index 000000000..fa569caaa --- /dev/null +++ b/src/lib/swim/swim_transport.h @@ -0,0 +1,73 @@ +#ifndef TARANTOOL_SWIM_TRANSPORT_H_INCLUDED +#define TARANTOOL_SWIM_TRANSPORT_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "trivia/util.h" +#include <arpa/inet.h> + +/** Transport implementation. */ +struct swim_transport { + /** Socket. */ + int fd; + /** Socket address. */ + struct sockaddr_in addr; +}; + +/** + * Despite there are no transport vtab, those are virtual methods. + * But virtualization is handled on compilation time. This header + * file has one implementation for server, and another for tests. + * Transport source is built as a separate library. + * + * Methods below for server are just wrappers of corresponding + * system calls, working with UDP sockets. + */ + +ssize_t +swim_transport_send(struct swim_transport *transport, const void *data, + size_t size, const struct sockaddr *addr, + socklen_t addr_size); + +ssize_t +swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size, + struct sockaddr *addr, socklen_t *addr_size); + +int +swim_transport_bind(struct swim_transport *transport, + const struct sockaddr *addr, socklen_t addr_len); + +void +swim_transport_destroy(struct swim_transport *transport); + +void +swim_transport_create(struct swim_transport *transport); + +#endif /* TARANTOOL_SWIM_TRANSPORT_H_INCLUDED */ diff --git a/src/lua/init.c b/src/lua/init.c index ca4b47f3a..5b47aa3e3 100644 --- a/src/lua/init.c +++ b/src/lua/init.c @@ -58,6 +58,7 @@ #include "lua/fio.h" #include "lua/httpc.h" #include "lua/utf8.h" +#include "lua/swim.h" #include "digest.h" #include <small/ibuf.h> @@ -450,6 +451,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv) tarantool_lua_socket_init(L); tarantool_lua_pickle_init(L); tarantool_lua_digest_init(L); + tarantool_lua_swim_init(L); luaopen_http_client_driver(L); lua_pop(L, 1); luaopen_msgpack(L); diff --git a/src/lua/swim.c b/src/lua/swim.c new file mode 100644 index 000000000..317f53e73 --- /dev/null +++ b/src/lua/swim.c @@ -0,0 +1,370 @@ +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "utils.h" +#include "diag.h" +#include "tt_uuid.h" +#include "swim/swim.h" +#include "swim/swim_transport.h" +#include "small/ibuf.h" +#include "lua/info.h" +#include <info.h> + +/** SWIM instances are pushed as cdata with this id. */ +uint32_t CTID_STRUCT_SWIM_PTR; + +/** + * Get @a n-th value from a Lua stack as a struct swim pointer. + * @param L Lua state. + * @param n Where pointer is stored on Lua stack. + * + * @retval NULL The stack position does not exist or it is not a + * struct swim pointer. + * @retval not NULL Valid SWIM pointer. + */ +static inline struct swim * +lua_swim_ptr(struct lua_State *L, int n) +{ + uint32_t ctypeid; + if (lua_type(L, n) != LUA_TCDATA) + return NULL; + void *swim = luaL_checkcdata(L, n, &ctypeid); + if (ctypeid != CTID_STRUCT_SWIM_PTR) + return NULL; + return *(struct swim **) swim; +} + +/** + * Delete SWIM instance passed via first Lua stack position. Used + * by Lua GC. + */ +static int +lua_swim_gc(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "SWIM gc expected struct swim *"); + swim_delete(swim); + return 0; +} + +/** + * Get a value from a table that is supposed to be a timeout. + * @param L Lua state. + * @param ncfg Where on the Lua stack a table with the timeout is + * stored. + * @param fieldname Name of the table field storing the timeout. + * @param funcname Caller function name, used to build a detailed + * error message. + * + * @retval 0 > A timeout value. + * @retval -1 The field is nil. + */ +static inline double +lua_swim_get_timeout_field(struct lua_State *L, int ncfg, const char *fieldname, + const char *funcname) +{ + double timeout = -1; + lua_getfield(L, ncfg, fieldname); + if (lua_isnumber(L, -1)) { + timeout = lua_tonumber(L, -1); + if (timeout <= 0) { + return luaL_error(L, "swim.%s: %s should be positive "\ + "number", funcname, fieldname); + } + } else if (! lua_isnil(L, -1)) { + return luaL_error(L, "swim.%s: %s should be positive number", + funcname, fieldname); + } + lua_pop(L, 1); + return timeout; +} + +/** + * Get a value from a table that is supposed to be a UUID. + * @param L Lua state. + * @param ncfg Where on the Lua stack a table with the UUID is + * stored. + * @param fieldname Name of the table field storing the UUID. + * @param funcname Caller function name, used to build a detailed + * error message. + * @param[out] uuid Result UUID. Nil UUID is stored, if the field + * was nil. + */ +static inline void +lua_swim_get_uuid_field(struct lua_State *L, int ncfg, const char *fieldname, + const char *funcname, struct tt_uuid *uuid) +{ + lua_getfield(L, ncfg, fieldname); + if (lua_isstring(L, -1)) { + if (tt_uuid_from_string(lua_tostring(L, -1), uuid) != 0) { + luaL_error(L, "swim.%s: %s is invalid", funcname, + fieldname); + } + } else if (lua_isnil(L, -1)) { + *uuid = uuid_nil; + } else { + luaL_error(L, "swim.%s: %s should be a string", funcname, + fieldname); + } + lua_pop(L, 1); +} + +/** + * Get a value from a table that is supposed to be a URI. + * @param L Lua state. + * @param ncfg Where on the Lua stack a table with the URI is + * stored. + * @param fieldname Name of the table field storing the URI. + * @param funcname Caller function name, used to build a detailed + * error message. + * + * @retval not NULL A URI. + * @retval NULL The field is nil. + */ +static inline const char * +lua_swim_get_uri_field(struct lua_State *L, int ncfg, const char *fieldname, + const char *funcname) +{ + const char *uri = NULL; + lua_getfield(L, ncfg, fieldname); + if (lua_isstring(L, -1)) { + uri = lua_tostring(L, -1); + } else if (! lua_isnil(L, -1)) { + luaL_error(L, "swim.%s: %s should be a string URI", funcname, + fieldname); + } + lua_pop(L, 1); + return uri; +} + +/** + * Configure @a swim instance using a table stored in @a ncfg-th + * position on the Lua stack. + * @param L Lua state. + * @param ncfg Where configuration is stored on the Lua stack. + * @param swim SWIM instance to configure. + * @param funcname Caller function name to use in error messages. + * + * @retval 0 Success. + * @retval -1 Error, stored in diagnostics area. Critical errors + * like OOM or incorrect usage are thrown. + */ +static int +lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim, + const char *funcname) +{ + if (! lua_istable(L, ncfg)) { + return luaL_error(L, "swim.%s: expected table config", + funcname); + } + const char *server_uri = + lua_swim_get_uri_field(L, ncfg, "server", funcname); + struct tt_uuid uuid; + lua_swim_get_uuid_field(L, ncfg, "uuid", funcname, &uuid); + double heartbeat_rate = + lua_swim_get_timeout_field(L, ncfg, "heartbeat", funcname); + + return swim_cfg(swim, server_uri, heartbeat_rate, &uuid); +} + +/** + * Create a new SWIM instance. The Lua stack can contain either 0 + * parameters to just create a new non-configured SWIM instance, + * or 1 parameter with a config to configure the new instance + * immediately. + * @param L Lua state. + * @retval 1 A SWIM instance. + * @retval 2 Nil and an error object. On invalid Lua parameters + * and OOM it throws. + */ +static int +lua_swim_new(struct lua_State *L) +{ + int top = lua_gettop(L); + if (top > 1) + return luaL_error(L, "Usage: swim.new([{<config>}])"); + struct swim *swim = swim_new(); + if (swim != NULL) { + *(struct swim **)luaL_pushcdata(L, CTID_STRUCT_SWIM_PTR) = swim; + lua_pushcfunction(L, lua_swim_gc); + luaL_setcdatagc(L, -2); + if (top == 0 || lua_swim_cfg_impl(L, 1, swim, "new") == 0) + return 1; + lua_pop(L, 1); + } + lua_pushnil(L); + luaT_pusherror(L, diag_last_error(diag_get())); + return 2; +} + +/** + * Configure an existing SWIM instance. The Lua stack should + * contain two values - a SWIM instance to configure, and a + * config. + * @param L Lua state. + * @retval 1 True. + * @retval 2 Nil and an error object. On invalid Lua parameters + * and OOM it throws. + */ +static int +lua_swim_cfg(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "Usage: swim:cfg({<config>})"); + if (lua_swim_cfg_impl(L, 2, swim, "cfg") != 0) { + lua_pushnil(L); + luaT_pusherror(L, diag_last_error(diag_get())); + return 2; + } + lua_pushboolean(L, true); + return 1; +} + +/** + * Add a new member to a SWIM instance. The Lua stack should + * contain two values - a SWIM instance to add to, and a config of + * a new member. Config is a table, containing UUID and URI keys. + * @param L Lua state. + * @retval 1 True. + * @retval 2 Nil and an error object. On invalid Lua parameters + * and OOM it throws. + */ +static int +lua_swim_add_member(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (lua_gettop(L) != 2 || swim == NULL || !lua_istable(L, 1)) + return luaL_error(L, "Usage: swim:add_member({<config>})"); + const char *uri = lua_swim_get_uri_field(L, 1, "uri", "add_member"); + struct tt_uuid uuid; + lua_swim_get_uuid_field(L, 1, "uuid", "add_member", &uuid); + + if (swim_add_member(swim, uri, &uuid) != 0) { + lua_pushnil(L); + luaT_pusherror(L, diag_last_error(diag_get())); + return 2; + } + lua_pushboolean(L, true); + return 1; +} + +/** + * Silently remove a member from a SWIM instance's members table. + * The Lua stack should contain two values - a SWIM instance to + * remove from, and a UUID of a sentenced member. + * @param L Lua state. + * @retval 1 True. + * @retval 2 Nil and an error object. On invalid Lua parameters + * and OOM it throws. + */ +static int +lua_swim_remove_member(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (lua_gettop(L) != 2 || swim == NULL) + return luaL_error(L, "Usage: swim:remove_member(uuid)"); + if (! lua_isstring(L, -1)) { + return luaL_error(L, "swim.remove_member: member UUID should "\ + "be a string"); + } + struct tt_uuid uuid; + if (tt_uuid_from_string(lua_tostring(L, 1), &uuid) != 0) + return luaL_error(L, "swim.remove_member: invalid UUID"); + + if (swim_remove_member(swim, &uuid) != 0) { + lua_pushnil(L); + luaT_pusherror(L, diag_last_error(diag_get())); + return 2; + } + lua_pushboolean(L, true); + return 1; +} + +/** + * Destroy and delete a SWIM instance. All its memory is freed, it + * stops participating in any rounds, the socket is closed. No + * special quit messages are broadcasted - the quit is silent. So + * other members will think that this one is dead. The Lua stack + * should contain one value - a SWIM instance to delete. + */ +static int +lua_swim_delete(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "Usage: swim:delete()"); + swim_delete(swim); + uint32_t ctypeid; + struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid); + assert(ctypeid == CTID_STRUCT_SWIM_PTR); + *cdata = NULL; + return 0; +} + +/** + * Collect information about this instance's members table. + * @param L Lua state. + * @retval 1 Info table. + */ +static int +lua_swim_info(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "Usage: swim:info()"); + struct info_handler info; + luaT_info_handler_create(&info, L); + swim_info(swim, &info); + return 1; +} + +void +tarantool_lua_swim_init(struct lua_State *L) +{ + static const struct luaL_Reg lua_swim_methods [] = { + {"new", lua_swim_new}, + {"cfg", lua_swim_cfg}, + {"add_member", lua_swim_add_member}, + {"remove_member", lua_swim_remove_member}, + {"delete", lua_swim_delete}, + {"info", lua_swim_info}, + {NULL, NULL} + }; + luaL_register_module(L, "swim", lua_swim_methods); + lua_pop(L, 1); + int rc = luaL_cdef(L, "struct swim;"); + assert(rc == 0); + (void) rc; + CTID_STRUCT_SWIM_PTR = luaL_ctypeid(L, "struct swim *"); + assert(CTID_STRUCT_SWIM_PTR != 0); +}; diff --git a/src/lua/swim.h b/src/lua/swim.h new file mode 100644 index 000000000..989cf62b3 --- /dev/null +++ b/src/lua/swim.h @@ -0,0 +1,47 @@ +#ifndef INCLUDES_TARANTOOL_LUA_SWIM_H +#define INCLUDES_TARANTOOL_LUA_SWIM_H +/* + * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#if defined(__cplusplus) +extern "C" { +#endif + +struct lua_State; + +void +tarantool_lua_swim_init(struct lua_State *L); + +#if defined(__cplusplus) +} +#endif + +#endif /* INCLUDES_TARANTOOL_LUA_SWIM_H */ diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 16739f75d..cfaf93fb9 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -206,3 +206,6 @@ target_link_libraries(checkpoint_schedule.test m unit) add_executable(sio.test sio.c) target_link_libraries(sio.test unit core) + +add_executable(swim.test swim.c swim_test_transport.c) +target_link_libraries(swim.test unit core swim) diff --git a/test/unit/swim.c b/test/unit/swim.c new file mode 100644 index 000000000..df5d4d0d9 --- /dev/null +++ b/test/unit/swim.c @@ -0,0 +1,34 @@ +#include "memory.h" +#include "fiber.h" +#include "unit.h" +#include "swim/swim_transport.h" + +static int +main_f(va_list ap) +{ + (void) ap; + return 0; +} + +int +main() +{ + header(); + plan(1); + ok(true, "true is true"); + + memory_init(); + fiber_init(fiber_c_invoke); + + struct fiber *main_fiber = fiber_new("main", main_f); + assert(main_fiber != NULL); + fiber_wakeup(main_fiber); + ev_run(loop(), 0); + + fiber_free(); + memory_free(); + + int rc = check_plan(); + footer(); + return rc; +} \ No newline at end of file diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c new file mode 100644 index 000000000..d1cfc9098 --- /dev/null +++ b/test/unit/swim_test_transport.c @@ -0,0 +1,78 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "swim/swim_transport.h" + +ssize_t +swim_transport_send(struct swim_transport *transport, const void *data, + size_t size, const struct sockaddr *addr, + socklen_t addr_size) +{ + (void) transport; + (void) data; + (void) size; + (void) addr; + (void) addr_size; + return 0; +} + +ssize_t +swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size, + struct sockaddr *addr, socklen_t *addr_size) +{ + (void) transport; + (void) buffer; + (void) size; + (void) addr; + (void) addr_size; + return 0; +} + +int +swim_transport_bind(struct swim_transport *transport, + const struct sockaddr *addr, socklen_t addr_len) +{ + (void) transport; + (void) addr; + (void) addr_len; + return 0; +} + +void +swim_transport_destroy(struct swim_transport *transport) +{ + (void) transport; +} + +void +swim_transport_create(struct swim_transport *transport) +{ + (void) transport; +} \ No newline at end of file -- 2.17.2 (Apple Git-113)
next prev parent reply other threads:[~2019-01-30 21:28 UTC|newest] Thread overview: 23+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 01/12] sio: introduce sio_uri_to_addr Vladislav Shpilevoy 2019-02-15 13:21 ` [tarantool-patches] " Konstantin Osipov 2019-02-15 21:22 ` [tarantool-patches] " Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 10/12] [RAW] swim: introduce 'quit' message Vladislav Shpilevoy 2019-02-21 12:13 ` [tarantool-patches] " Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 11/12] [RAW] swim: introduce broadcast tasks Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 12/12] [RAW] swim: allow to use broadcast tasks to send pings Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 02/12] evio: expose evio_setsockopt_server function Vladislav Shpilevoy 2019-02-15 13:21 ` [tarantool-patches] " Konstantin Osipov 2019-02-15 21:22 ` [tarantool-patches] " Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted Vladislav Shpilevoy 2019-02-15 13:26 ` [tarantool-patches] " Konstantin Osipov 2019-02-15 13:34 ` [tarantool-patches] " Vladislav Shpilevoy 2019-02-15 18:07 ` Konstantin Osipov 2019-01-30 21:28 ` Vladislav Shpilevoy [this message] 2019-02-21 18:35 ` [tarantool-patches] [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component Konstantin Osipov 2019-02-26 18:28 ` [tarantool-patches] " Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 05/12] [RAW] swim: introduce failure detection component Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 06/12] [RAW] swim: introduce dissemination component Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 07/12] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 08/12] [RAW] swim: introduce payload Vladislav Shpilevoy 2019-01-30 21:28 ` [PATCH v4 09/12] [RAW] swim: introduce routing Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=d52c1002cdeb5f8a85d43d6d5dc9a02a0726f08f.1548883137.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=vdavydov.dev@gmail.com \ --subject='Re: [PATCH v4 04/12] [RAW] swim: introduce SWIM'\''s anti-entropy component' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox