[tarantool-patches] [PATCH 1/2] swim: pool IO tasks
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Sat Jul 6 01:40:08 MSK 2019
Before the patch each SWIM member had two preallocated task
objects, 3KB in total. It was a waste of memory, because network
load per member in SWIM is ~2 messages per round step regardless
of cluster size.
This patch moves the tasks to a pool, where they can be reused.
Even by different SWIM instances running on the same node.
---
src/lib/swim/swim.c | 78 +++++++++++++++++++++++++++----------
src/lib/swim/swim_io.c | 88 +++++++++++++++++++++++++++++++++++++++---
src/lib/swim/swim_io.h | 26 ++++++++++---
3 files changed, 161 insertions(+), 31 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 7d07a353d..9a7b4ce85 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -414,10 +414,14 @@ struct swim_member {
* message to it.
*/
struct heap_node in_wait_ack_heap;
- /** Ready at hand regular ACK task. */
- struct swim_task ack_task;
- /** Ready at hand regular PING task. */
- struct swim_task ping_task;
+ /**
+ * Last sent failure detection tasks. Kept so as
+ * 1) not to send them twice;
+ * 2) to be able to cancel them when the member is
+ * deleted.
+ */
+ struct swim_task *ack_task;
+ struct swim_task *ping_task;
};
#define mh_name _swim_table
@@ -759,16 +763,31 @@ static void
swim_ping_task_complete(struct swim_task *task,
struct swim_scheduler *scheduler, int rc)
{
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct swim_member *m = task->member;
+ assert(m != NULL);
+ assert(m->ping_task == task);
/*
* If ping send has failed, it makes no sense to wait for
* an ACK.
*/
- if (rc < 0)
- return;
- struct swim *swim = swim_by_scheduler(scheduler);
- struct swim_member *m = container_of(task, struct swim_member,
- ping_task);
- swim_wait_ack(swim, m, false);
+ if (rc >= 0)
+ swim_wait_ack(swim, m, false);
+ m->ping_task = NULL;
+ swim_task_delete(task);
+}
+
+/** When ACK is completed, allow to send a new ACK. */
+static void
+swim_ack_task_complete(struct swim_task *task, struct swim_scheduler *scheduler,
+ int rc)
+{
+ (void) scheduler;
+ (void) rc;
+ assert(task->member != NULL);
+ assert(task->member->ack_task == task);
+ task->member->ack_task = NULL;
+ swim_task_delete(task);
}
void
@@ -802,8 +821,14 @@ swim_member_delete(struct swim_member *member)
/* Failure detection component. */
assert(heap_node_is_stray(&member->in_wait_ack_heap));
- swim_task_destroy(&member->ack_task);
- swim_task_destroy(&member->ping_task);
+ if (member->ack_task != NULL) {
+ swim_task_delete(member->ack_task);
+ member->ack_task = NULL;
+ }
+ if (member->ping_task != NULL) {
+ swim_task_delete(member->ping_task);
+ member->ping_task = NULL;
+ }
/* Dissemination component. */
assert(rlist_empty(&member->in_dissemination_queue));
@@ -833,9 +858,6 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
/* Failure detection component. */
member->incarnation = *incarnation;
heap_node_create(&member->in_wait_ack_heap);
- swim_task_create(&member->ack_task, NULL, NULL, "ack");
- swim_task_create(&member->ping_task, swim_ping_task_complete, NULL,
- "ping");
/* Dissemination component. */
rlist_create(&member->in_dissemination_queue);
@@ -1340,7 +1362,7 @@ swim_iping_task_complete(struct swim_task *task,
if (m != NULL && m->status != MEMBER_ALIVE)
swim_wait_ack(swim, m, true);
finish:
- swim_task_delete_cb(task, scheduler, rc);
+ swim_task_delete(task);
}
/**
@@ -1439,7 +1461,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
default:
unreachable();
}
- swim_send_ping(swim, &m->ping_task, &m->addr);
+ m->ping_task = swim_task_new(swim_ping_task_complete, NULL,
+ "ping");
+ if (m->ping_task != NULL) {
+ m->ping_task->member = m;
+ swim_send_ping(swim, m->ping_task, &m->addr);
+ } else {
+ diag_log();
+ }
}
}
@@ -1683,8 +1712,16 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
if (swim_send_indirect_ack(swim, &member->addr,
proxy) != 0)
diag_log();
- } else if (! swim_task_is_scheduled(&member->ack_task)) {
- swim_send_ack(swim, &member->ack_task, &member->addr);
+ } else if (member->ack_task == NULL) {
+ member->ack_task = swim_task_new(swim_ack_task_complete,
+ NULL, "ack");
+ if (member->ack_task != NULL) {
+ member->ack_task->member = member;
+ swim_send_ack(swim, member->ack_task,
+ &member->addr);
+ } else {
+ diag_log();
+ }
}
break;
case SWIM_FD_MSG_ACK:
@@ -2297,8 +2334,7 @@ swim_quit(struct swim *swim)
swim_new_round(swim);
struct swim_task *task = &swim->round_step_task;
swim_task_destroy(task);
- swim_task_create(task, swim_quit_step_complete, swim_task_delete_cb,
- "quit");
+ swim_task_create(task, swim_quit_step_complete, NULL, "quit");
char *header = swim_packet_alloc(&task->packet, 1);
int rc = swim_encode_src_uuid(swim, &task->packet) +
swim_encode_quit(swim, &task->packet);
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index e7ff321d4..d83f52ae9 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -36,6 +36,59 @@
#include <ifaddrs.h>
#include <net/if.h>
+enum {
+ /**
+ * A rough estimation of how many tasks a SWIM instance
+ * would need simultaneously. 1 for an ACK, 2 for indirect
+ * ping, 1 for direct ping. Total is 4 for normal
+ * operation. Others are 1) to get a beautiful number,
+ * 2) in case random() is not perfect and this instance
+ * interacts with 2 and more other instances during one
+ * round.
+ */
+ TASKS_PER_SCHEDULER = 16,
+};
+
+/**
+ * All the SWIM instances and their members use the same objects
+ * to send data - tasks. Each task is ~1.5KB, and on one hand it
+ * would be a waste of memory to keep preallocated tasks for each
+ * member. One the other hand it would be too slow to allocate
+ * and delete ~1.5KB on each interaction, ~3KB on each round step.
+ * Here is a pool of free tasks shared among all SWIM instances
+ * to avoid allocations, but do not keep a separate task for each
+ * member.
+ */
+static struct stailq swim_task_pool;
+/** Number of pooled tasks. */
+static int swim_task_pool_size = 0;
+/**
+ * Number of currently active schedulers. Used to limit max size
+ * of the pool.
+ */
+static int scheduler_count = 0;
+
+/** First scheduler should create the pool. */
+static inline void
+swim_task_pool_create(void)
+{
+ assert(scheduler_count == 1);
+ assert(swim_task_pool_size == 0);
+ stailq_create(&swim_task_pool);
+}
+
+/** Last scheduler destroys the pool. */
+static inline void
+swim_task_pool_destroy(void)
+{
+ assert(scheduler_count == 0);
+ while (! stailq_empty(&swim_task_pool)) {
+ free(stailq_shift_entry(&swim_task_pool, struct swim_task,
+ in_pool));
+ }
+ swim_task_pool_size = 0;
+}
+
/**
* Allocate memory for meta. The same as mere alloc, but moves
* body pointer.
@@ -124,22 +177,42 @@ swim_task_create(struct swim_task *task, swim_task_f complete,
struct swim_task *
swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc)
{
- struct swim_task *task = (struct swim_task *) malloc(sizeof(*task));
- if (task == NULL) {
- diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
- return NULL;
+ struct swim_task *task;
+ if (swim_task_pool_size > 0) {
+ assert(! stailq_empty(&swim_task_pool));
+ --swim_task_pool_size;
+ task = stailq_shift_entry(&swim_task_pool, struct swim_task,
+ in_pool);
+ } else {
+ task = (struct swim_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
}
swim_task_create(task, complete, cancel, desc);
return task;
}
+void
+swim_task_delete(struct swim_task *task)
+{
+ swim_task_destroy(task);
+ if (swim_task_pool_size < TASKS_PER_SCHEDULER * scheduler_count) {
+ stailq_add_entry(&swim_task_pool, task, in_pool);
+ ++swim_task_pool_size;
+ } else {
+ free(task);
+ }
+}
+
void
swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
int rc)
{
(void) rc;
(void) scheduler;
- free(task);
+ swim_task_delete(task);
}
/** Put the task into the queue of output tasks. */
@@ -283,6 +356,8 @@ swim_scheduler_create(struct swim_scheduler *scheduler,
CRYPTO_MODE_ECB, NULL, 0);
assert(rc == 0);
(void) rc;
+ if (++scheduler_count == 1)
+ swim_task_pool_create();
}
int
@@ -326,6 +401,9 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
swim_transport_destroy(&scheduler->transport);
swim_ev_io_stop(swim_loop(), &scheduler->output);
swim_scheduler_stop_input(scheduler);
+ assert(scheduler_count > 0);
+ if (--scheduler_count == 0)
+ swim_task_pool_destroy();
}
/**
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 6bf42cb05..3e631935d 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -46,6 +46,7 @@
*/
struct swim_task;
+struct swim_member;
struct swim_scheduler;
enum {
@@ -247,11 +248,22 @@ struct swim_task {
* A short description of the packet content. For logging.
*/
const char *desc;
- /**
- * Sender's UUID used by ping tasks to schedule deadline
- * for an ACK.
- */
- struct tt_uuid uuid;
+ union {
+ /**
+ * Receiver's UUID used by ping tasks to schedule
+ * deadline for an ACK.
+ */
+ struct tt_uuid uuid;
+ /**
+ * Alternative to UUID - direct pointer to the
+ * receiver member. It works, when members and
+ * tasks of a certain type are isomorphic. It is
+ * faster than lookup by UUID.
+ */
+ struct swim_member *member;
+ };
+ /** Link in the task pool. */
+ struct stailq_entry in_pool;
};
/** Check if @a task is already scheduled. */
@@ -284,6 +296,10 @@ swim_task_create(struct swim_task *task, swim_task_f complete,
struct swim_task *
swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc);
+/** Destroy a task, free its memory. */
+void
+swim_task_delete(struct swim_task *task);
+
/** Callback to delete a task after its completion. */
void
swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
--
2.20.1 (Apple Git-117)
More information about the Tarantool-patches
mailing list