[tarantool-patches] [PATCH 1/2] swim: pool IO tasks

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Jul 6 01:40:08 MSK 2019


Before the patch each SWIM member had two preallocated task
objects, 3KB in total. It was a waste of memory, because network
load per member in SWIM is ~2 messages per round step regardless
of cluster size.

This patch moves the tasks to a pool, where they can be reused.
Even by different SWIM instances running on the same node.
---
 src/lib/swim/swim.c    | 78 +++++++++++++++++++++++++++----------
 src/lib/swim/swim_io.c | 88 +++++++++++++++++++++++++++++++++++++++---
 src/lib/swim/swim_io.h | 26 ++++++++++---
 3 files changed, 161 insertions(+), 31 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 7d07a353d..9a7b4ce85 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -414,10 +414,14 @@ struct swim_member {
 	 * message to it.
 	 */
 	struct heap_node in_wait_ack_heap;
-	/** Ready at hand regular ACK task. */
-	struct swim_task ack_task;
-	/** Ready at hand regular PING task. */
-	struct swim_task ping_task;
+	/**
+	 * Last sent failure detection tasks. Kept so as
+	 * 1) not to send them twice;
+	 * 2) to be able to cancel them when the member is
+	 *    deleted.
+	 */
+	struct swim_task *ack_task;
+	struct swim_task *ping_task;
 };
 
 #define mh_name _swim_table
@@ -759,16 +763,31 @@ static void
 swim_ping_task_complete(struct swim_task *task,
 			struct swim_scheduler *scheduler, int rc)
 {
+	struct swim *swim = swim_by_scheduler(scheduler);
+	struct swim_member *m = task->member;
+	assert(m != NULL);
+	assert(m->ping_task == task);
 	/*
 	 * If ping send has failed, it makes no sense to wait for
 	 * an ACK.
 	 */
-	if (rc < 0)
-		return;
-	struct swim *swim = swim_by_scheduler(scheduler);
-	struct swim_member *m = container_of(task, struct swim_member,
-					     ping_task);
-	swim_wait_ack(swim, m, false);
+	if (rc >= 0)
+		swim_wait_ack(swim, m, false);
+	m->ping_task = NULL;
+	swim_task_delete(task);
+}
+
+/** When ACK is completed, allow to send a new ACK. */
+static void
+swim_ack_task_complete(struct swim_task *task, struct swim_scheduler *scheduler,
+		       int rc)
+{
+	(void) scheduler;
+	(void) rc;
+	assert(task->member != NULL);
+	assert(task->member->ack_task == task);
+	task->member->ack_task = NULL;
+	swim_task_delete(task);
 }
 
 void
@@ -802,8 +821,14 @@ swim_member_delete(struct swim_member *member)
 
 	/* Failure detection component. */
 	assert(heap_node_is_stray(&member->in_wait_ack_heap));
-	swim_task_destroy(&member->ack_task);
-	swim_task_destroy(&member->ping_task);
+	if (member->ack_task != NULL) {
+		swim_task_delete(member->ack_task);
+		member->ack_task = NULL;
+	}
+	if (member->ping_task != NULL) {
+		swim_task_delete(member->ping_task);
+		member->ping_task = NULL;
+	}
 
 	/* Dissemination component. */
 	assert(rlist_empty(&member->in_dissemination_queue));
@@ -833,9 +858,6 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
 	/* Failure detection component. */
 	member->incarnation = *incarnation;
 	heap_node_create(&member->in_wait_ack_heap);
-	swim_task_create(&member->ack_task, NULL, NULL, "ack");
-	swim_task_create(&member->ping_task, swim_ping_task_complete, NULL,
-			 "ping");
 
 	/* Dissemination component. */
 	rlist_create(&member->in_dissemination_queue);
@@ -1340,7 +1362,7 @@ swim_iping_task_complete(struct swim_task *task,
 	if (m != NULL && m->status != MEMBER_ALIVE)
 		swim_wait_ack(swim, m, true);
 finish:
-	swim_task_delete_cb(task, scheduler, rc);
+	swim_task_delete(task);
 }
 
 /**
@@ -1439,7 +1461,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
 		default:
 			unreachable();
 		}
-		swim_send_ping(swim, &m->ping_task, &m->addr);
+		m->ping_task = swim_task_new(swim_ping_task_complete, NULL,
+					     "ping");
+		if (m->ping_task != NULL) {
+			m->ping_task->member = m;
+			swim_send_ping(swim, m->ping_task, &m->addr);
+		} else {
+			diag_log();
+		}
 	}
 }
 
@@ -1683,8 +1712,16 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 			if (swim_send_indirect_ack(swim, &member->addr,
 						   proxy) != 0)
 				diag_log();
-		} else if (! swim_task_is_scheduled(&member->ack_task)) {
-			swim_send_ack(swim, &member->ack_task, &member->addr);
+		} else if (member->ack_task == NULL) {
+			member->ack_task = swim_task_new(swim_ack_task_complete,
+							 NULL, "ack");
+			if (member->ack_task != NULL) {
+				member->ack_task->member = member;
+				swim_send_ack(swim, member->ack_task,
+					      &member->addr);
+			} else {
+				diag_log();
+			}
 		}
 		break;
 	case SWIM_FD_MSG_ACK:
@@ -2297,8 +2334,7 @@ swim_quit(struct swim *swim)
 	swim_new_round(swim);
 	struct swim_task *task = &swim->round_step_task;
 	swim_task_destroy(task);
-	swim_task_create(task, swim_quit_step_complete, swim_task_delete_cb,
-			 "quit");
+	swim_task_create(task, swim_quit_step_complete, NULL, "quit");
 	char *header = swim_packet_alloc(&task->packet, 1);
 	int rc = swim_encode_src_uuid(swim, &task->packet) +
 		 swim_encode_quit(swim, &task->packet);
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index e7ff321d4..d83f52ae9 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -36,6 +36,59 @@
 #include <ifaddrs.h>
 #include <net/if.h>
 
+enum {
+	/**
+	 * A rough estimation of how many tasks a SWIM instance
+	 * would need simultaneously. 1 for an ACK, 2 for indirect
+	 * ping, 1 for direct ping. Total is 4 for normal
+	 * operation. Others are 1) to get a beautiful number,
+	 * 2) in case random() is not perfect and this instance
+	 * interacts with 2 and more other instances during one
+	 * round.
+	 */
+	TASKS_PER_SCHEDULER = 16,
+};
+
+/**
+ * All the SWIM instances and their members use the same objects
+ * to send data - tasks. Each task is ~1.5KB, and on one hand it
+ * would be a waste of memory to keep preallocated tasks for each
+ * member. One the other hand it would be too slow to allocate
+ * and delete ~1.5KB on each interaction, ~3KB on each round step.
+ * Here is a pool of free tasks shared among all SWIM instances
+ * to avoid allocations, but do not keep a separate task for each
+ * member.
+ */
+static struct stailq swim_task_pool;
+/** Number of pooled tasks. */
+static int swim_task_pool_size = 0;
+/**
+ * Number of currently active schedulers. Used to limit max size
+ * of the pool.
+ */
+static int scheduler_count = 0;
+
+/** First scheduler should create the pool. */
+static inline void
+swim_task_pool_create(void)
+{
+	assert(scheduler_count == 1);
+	assert(swim_task_pool_size == 0);
+	stailq_create(&swim_task_pool);
+}
+
+/** Last scheduler destroys the pool. */
+static inline void
+swim_task_pool_destroy(void)
+{
+	assert(scheduler_count == 0);
+	while (! stailq_empty(&swim_task_pool)) {
+		free(stailq_shift_entry(&swim_task_pool, struct swim_task,
+					in_pool));
+	}
+	swim_task_pool_size = 0;
+}
+
 /**
  * Allocate memory for meta. The same as mere alloc, but moves
  * body pointer.
@@ -124,22 +177,42 @@ swim_task_create(struct swim_task *task, swim_task_f complete,
 struct swim_task *
 swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc)
 {
-	struct swim_task *task = (struct swim_task *) malloc(sizeof(*task));
-	if (task == NULL) {
-		diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
-		return NULL;
+	struct swim_task *task;
+	if (swim_task_pool_size > 0) {
+		assert(! stailq_empty(&swim_task_pool));
+		--swim_task_pool_size;
+		task = stailq_shift_entry(&swim_task_pool, struct swim_task,
+					  in_pool);
+	} else {
+		task = (struct swim_task *) malloc(sizeof(*task));
+		if (task == NULL) {
+			diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+			return NULL;
+		}
 	}
 	swim_task_create(task, complete, cancel, desc);
 	return task;
 }
 
+void
+swim_task_delete(struct swim_task *task)
+{
+	swim_task_destroy(task);
+	if (swim_task_pool_size < TASKS_PER_SCHEDULER * scheduler_count) {
+		stailq_add_entry(&swim_task_pool, task, in_pool);
+		++swim_task_pool_size;
+	} else {
+		free(task);
+	}
+}
+
 void
 swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
 		    int rc)
 {
 	(void) rc;
 	(void) scheduler;
-	free(task);
+	swim_task_delete(task);
 }
 
 /** Put the task into the queue of output tasks. */
@@ -283,6 +356,8 @@ swim_scheduler_create(struct swim_scheduler *scheduler,
 					  CRYPTO_MODE_ECB, NULL, 0);
 	assert(rc == 0);
 	(void) rc;
+	if (++scheduler_count == 1)
+		swim_task_pool_create();
 }
 
 int
@@ -326,6 +401,9 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
 	swim_transport_destroy(&scheduler->transport);
 	swim_ev_io_stop(swim_loop(), &scheduler->output);
 	swim_scheduler_stop_input(scheduler);
+	assert(scheduler_count > 0);
+	if (--scheduler_count == 0)
+		swim_task_pool_destroy();
 }
 
 /**
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 6bf42cb05..3e631935d 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -46,6 +46,7 @@
  */
 
 struct swim_task;
+struct swim_member;
 struct swim_scheduler;
 
 enum {
@@ -247,11 +248,22 @@ struct swim_task {
 	 * A short description of the packet content. For logging.
 	 */
 	const char *desc;
-	/**
-	 * Sender's UUID used by ping tasks to schedule deadline
-	 * for an ACK.
-	 */
-	struct tt_uuid uuid;
+	union {
+		/**
+		 * Receiver's UUID used by ping tasks to schedule
+		 * deadline for an ACK.
+		 */
+		struct tt_uuid uuid;
+		/**
+		 * Alternative to UUID - direct pointer to the
+		 * receiver member. It works, when members and
+		 * tasks of a certain type are isomorphic. It is
+		 * faster than lookup by UUID.
+		 */
+		struct swim_member *member;
+	};
+	/** Link in the task pool. */
+	struct stailq_entry in_pool;
 };
 
 /** Check if @a task is already scheduled. */
@@ -284,6 +296,10 @@ swim_task_create(struct swim_task *task, swim_task_f complete,
 struct swim_task *
 swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc);
 
+/** Destroy a task, free its memory. */
+void
+swim_task_delete(struct swim_task *task);
+
 /** Callback to delete a task after its completion. */
 void
 swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
-- 
2.20.1 (Apple Git-117)





More information about the Tarantool-patches mailing list