[tarantool-patches] [PATCH 4/6] test: generalize SWIM fake descriptor filters

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 12 01:22:28 MSK 2019


At this moment SWIM test harness implementes its own fake file
descriptor table, which is used unawares by the real SWIM code.
Each fake fd has send and recv queues, can delay and drop
packets with a certain probability. But it is going to be not
enough for new tests.

It is wanted to be able to drop packets with a specified content,
from and to a specified direction. For that the patch implements
a filtering mechanism. Each fake fd now has a list of filters,
applied one by one to each packet. If at least on filter wants to
drop a packet, then it is dropped. The filters know packet
content and direction: outgoing or incomming.

Now only one filter exists - drop rate. It existed even before
the patch, but now it is ported on the new API.

Part of #3234
---
 test/unit/swim.c                |  21 ++---
 test/unit/swim_test_transport.c | 133 ++++++++++++++++++++++++++------
 test/unit/swim_test_transport.h |  41 ++++++++--
 test/unit/swim_test_utils.c     |  68 +++++++++++++++-
 test/unit/swim_test_utils.h     |  18 +++++
 5 files changed, 236 insertions(+), 45 deletions(-)

diff --git a/test/unit/swim.c b/test/unit/swim.c
index 03f6b412c..48aea2f07 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -590,29 +590,22 @@ swim_test_uri_update(void)
 		swim_member_by_uuid(s1, swim_member_uuid(s0_self));
 	is(strcmp(new_s0_uri, swim_member_uri(s0_view)), 0,
 	   "S1 updated its URI and S2 sees that");
-	/*
-	 * S2 should not manage to send the new address to S3, but
-	 * should accept S3 packets later - therefore block is
-	 * needed.
-	 */
-	swim_cluster_block_io(cluster, 1);
 	/*
 	 * S1 should not send the new address to S3 - drop its
 	 * packets.
 	 */
 	swim_cluster_set_drop(cluster, 0, 100);
 	/*
-	 * Main part of the test - S3 sends the old address to S1.
+	 * S2 should not manage to send the new address to S3, but
+	 * should accept S3 packets with the old address and
+	 * ignore it.
 	 */
-	swim_cluster_set_drop(cluster, 2, 0);
-	swim_run_for(3);
-	swim_cluster_set_drop(cluster, 2, 100);
+	swim_cluster_set_drop_out(cluster, 1, 100);
 	/*
-	 * S2 absorbs the packets, but should ignore the old
-	 * address.
+	 * Main part of the test - S3 sends the old address to S2.
 	 */
-	swim_cluster_unblock_io(cluster, 1);
-	swim_run_for(2);
+	swim_cluster_set_drop(cluster, 2, 0);
+	swim_run_for(3);
 	is(strcmp(new_s0_uri, swim_member_uri(s0_view)), 0,
 	   "S2 still keeps new S1's URI, even received the old one from S3");
 
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index 8ad434340..5f84a7c95 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -100,6 +100,51 @@ swim_test_packet_dup(struct swim_test_packet *p)
 	return res;
 }
 
+/**
+ * Packet filter. Each fake file descriptor has a list of filters.
+ * For each incoming and outgoing packet it checks all the
+ * filters in the list. If anyone wants to filter the packet out,
+ * then the packet is dropped.
+ */
+struct swim_fd_filter {
+	/** A function to decide whether to drop a packet. */
+	swim_test_filter_check_f check;
+	/**
+	 * A function called when the filter is deleted to free
+	 * @a udata if necessary.
+	 */
+	swim_test_filter_delete_f delete;
+	/**
+	 * Arbitrary user data. Passed to each call of @a check.
+	 */
+	void *udata;
+	/** Link in the list of filters in the descriptor. */
+	struct rlist in_filters;
+};
+
+/** Create a new filter. */
+static inline struct swim_fd_filter *
+swim_fd_filter_new(swim_test_filter_check_f check,
+		   swim_test_filter_delete_f delete, void *udata)
+{
+	struct swim_fd_filter *f = (struct swim_fd_filter *) malloc(sizeof(*f));
+	assert(f != NULL);
+	f->udata = udata;
+	f->check = check;
+	f->delete = delete;
+	rlist_create(&f->in_filters);
+	return f;
+}
+
+/** Delete @a filter and its data. */
+static inline void
+swim_fd_filter_delete(struct swim_fd_filter *filter)
+{
+	rlist_del_entry(filter, in_filters);
+	filter->delete(filter->udata);
+	free(filter);
+}
+
 /** Fake file descriptor. */
 struct swim_fd {
 	/** File descriptor number visible to libev. */
@@ -111,10 +156,11 @@ struct swim_fd {
 	 */
 	bool is_opened;
 	/**
-	 * Probability of packet loss. For both sends and
-	 * receipts.
+	 * List of packet filters. All of them are checked for
+	 * each packet, and if at least one decides to drop, then
+	 * the packet is deleted.
 	 */
-	double drop_rate;
+	struct rlist filters;
 	/**
 	 * Link in the list of opened and non-blocked descriptors.
 	 * Used to feed them all EV_WRITE.
@@ -143,12 +189,47 @@ swim_fd_open(struct swim_fd *fd)
 		diag_set(SocketError, "test_socket:1", "bind");
 		return -1;
 	}
+	assert(rlist_empty(&fd->filters));
 	fd->is_opened = true;
-	fd->drop_rate = 0;
 	rlist_add_tail_entry(&swim_fd_active, fd, in_active);
 	return 0;
 }
 
+/**
+ * Remove a filter having @a check function. Works just like the
+ * core triggers library. The found trigger is deleted. If nothing
+ * is found, then it is not an error.
+ */
+void
+swim_test_transport_remove_filter(int fd, swim_test_filter_check_f check)
+{
+	struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
+	assert(sfd->is_opened);
+	struct swim_fd_filter *f;
+	rlist_foreach_entry(f, &sfd->filters, in_filters) {
+		if (check == f->check) {
+			swim_fd_filter_delete(f);
+			return;
+		}
+	}
+}
+
+/**
+ * Add a new filter, or replace an existing one. If a filter
+ * already exists with the same @a check function, then it is
+ * deleted.
+ */
+void
+swim_test_transport_add_filter(int fd, swim_test_filter_check_f check,
+			       swim_test_filter_delete_f delete, void *udata)
+{
+	struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
+	assert(sfd->is_opened);
+	struct swim_fd_filter *f = swim_fd_filter_new(check, delete, udata);
+	swim_test_transport_remove_filter(fd, check);
+	rlist_add_tail_entry(&sfd->filters, f, in_filters);
+}
+
 /** Send one packet to destination's recv queue. */
 static inline void
 swim_fd_send_packet(struct swim_fd *fd);
@@ -159,6 +240,9 @@ swim_fd_close(struct swim_fd *fd)
 {
 	if (! fd->is_opened)
 		return;
+	struct swim_fd_filter *f, *f_tmp;
+	rlist_foreach_entry_safe(f, &fd->filters, in_filters, f_tmp)
+		swim_fd_filter_delete(f);
 	struct swim_test_packet *i, *tmp;
 	rlist_foreach_entry_safe(i, &fd->recv_queue, in_queue, tmp)
 		swim_test_packet_delete(i);
@@ -168,13 +252,30 @@ swim_fd_close(struct swim_fd *fd)
 	fd->is_opened = false;
 }
 
+/**
+ * Check all the packet filters if any wants to drop @a p packet.
+ * @a dir parameter says direction. Values are the same as for
+ * standard in/out descriptors: 0 for input, 1 for output.
+ */
+static inline bool
+swim_fd_test_if_drop(struct swim_fd *fd, const struct swim_test_packet *p,
+		     int dir)
+{
+	struct swim_fd_filter *f;
+	rlist_foreach_entry(f, &fd->filters, in_filters) {
+		if (f->check(p->data, p->size, f->udata, dir))
+			return true;
+	}
+	return false;
+}
+
 void
 swim_test_transport_init(void)
 {
 	for (int i = 0, evfd = FAKE_FD_BASE; i < FAKE_FD_NUMBER; ++i, ++evfd) {
+		rlist_create(&swim_fd[i].filters);
 		swim_fd[i].evfd = evfd;
 		swim_fd[i].is_opened = false;
-		swim_fd[i].drop_rate = 0;
 		rlist_create(&swim_fd[i].in_active);
 		rlist_create(&swim_fd[i].recv_queue);
 		rlist_create(&swim_fd[i].send_queue);
@@ -287,24 +388,6 @@ swim_test_transport_unblock_fd(int fd)
 		rlist_add_tail_entry(&swim_fd_active, sfd, in_active);
 }
 
-void
-swim_test_transport_set_drop(int fd, double value)
-{
-	struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
-	if (sfd->is_opened)
-		sfd->drop_rate = value;
-}
-
-/**
- * Returns true with probability @a rate, and is used to decided
- * wether to drop a packet or not.
- */
-static inline bool
-swim_test_is_drop(double rate)
-{
-	return ((double) rand() / RAND_MAX) * 100 < rate;
-}
-
 /**
  * Move @a p packet, originated from @a src descriptor's send
  * queue, to @a dst descriptor's recv queue. The function checks
@@ -315,8 +398,8 @@ static inline void
 swim_move_packet(struct swim_fd *src, struct swim_fd *dst,
 		 struct swim_test_packet *p)
 {
-	if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) &&
-	    !swim_test_is_drop(src->drop_rate))
+	if (dst->is_opened && !swim_fd_test_if_drop(dst, p, 0) &&
+	    !swim_fd_test_if_drop(src, p, 1))
 		rlist_add_tail_entry(&dst->recv_queue, p, in_queue);
 	else
 		swim_test_packet_delete(p);
diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h
index d751efe83..454de1d8f 100644
--- a/test/unit/swim_test_transport.h
+++ b/test/unit/swim_test_transport.h
@@ -40,6 +40,23 @@ struct ev_loop;
  * capability to set necessary loss level, delay, reorders.
  */
 
+/**
+ * Signature of a packet filter function. It takes packet data,
+ * arbitrary user data, and should return true, if the packet
+ * should be dropped. False otherwise. Direction is said via
+ * @a dir parameter. 0 means incoming packet, 1 means outgoing
+ * packet, just like standard IO descriptors.
+ */
+typedef bool (*swim_test_filter_check_f)(const char *data, int size,
+					 void *udata, int dir);
+
+/**
+ * It is possible that a filter is complex and uses helper data
+ * allocated somewhere. This function is called when the filter
+ * is dropped and allows to free user data.
+ */
+typedef void (*swim_test_filter_delete_f)(void *udata);
+
 /**
  * Until there are no new IO events, feed EV_WRITE event to all
  * opened descriptors; EV_READ to ones, who have not empty recv
@@ -61,13 +78,27 @@ void
 swim_test_transport_unblock_fd(int fd);
 
 /**
- * Drop rate of incoming and outgoing packets. Note, that even if
- * a packet is dropped on send, the node, owning @a fd, still
- * thinks, that the packet is sent. It is not a sender-visible
- * error.
+ * Add a filter to the file descriptor @a fd. If a filter with
+ * the same @a check function exists, then it is deleted and a
+ * new one is created.
+ * @param fd File descriptor to add filter to.
+ * @param check Check function. It is called for each packet and
+ *        should return true, when the packet should be dropped.
+ * @param delete A destructor for @a udata called when the filter
+ *        is dropped.
+ * @param udata Arbitrary user data, passed to each @a check
+ *        invocation.
+ */
+void
+swim_test_transport_add_filter(int fd, swim_test_filter_check_f check,
+			       swim_test_filter_delete_f delete, void *udata);
+
+/**
+ * Drop a filter from @a fd descriptor having @a check filter
+ * function.
  */
 void
-swim_test_transport_set_drop(int fd, double value);
+swim_test_transport_remove_filter(int fd, swim_test_filter_check_f check);
 
 /** Initialize test transport system. */
 void
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index da8dd4386..d933434e9 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -242,10 +242,76 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i)
 	swim_test_transport_unblock_fd(swim_fd(cluster->node[i].swim));
 }
 
+/** A structure used by drop rate packet filter. */
+struct swim_drop_rate {
+	/** True if should be applied to incoming packets. */
+	bool is_for_in;
+	/** True if should be applied to outgoing packets. */
+	bool is_for_out;
+	/** Drop rate percentage. */
+	double rate;
+};
+
+/** Create a new drop rate filter helper. */
+static inline struct swim_drop_rate *
+swim_drop_rate_new(double rate, bool is_for_in, bool is_for_out)
+{
+	struct swim_drop_rate *dr =
+		(struct swim_drop_rate *) malloc(sizeof(*dr));
+	assert(dr != NULL);
+	dr->rate = rate;
+	dr->is_for_in = is_for_in;
+	dr->is_for_out = is_for_out;
+	return dr;
+}
+
+/**
+ * A packet filter dropping a packet with a certain probability.
+ */
+static bool
+swim_filter_drop_rate(const char *data, int size, void *udata, int dir)
+{
+	(void) data;
+	(void) size;
+	struct swim_drop_rate *dr = (struct swim_drop_rate *) udata;
+	if ((dir == 0 && !dr->is_for_in) || (dir == 1 && !dr->is_for_out))
+		return false;
+	return ((double) rand() / RAND_MAX) * 100 < dr->rate;
+}
+
+/**
+ * Create a new drop rate filter for the instance with id @a i.
+ */
+static void
+swim_cluster_set_drop_generic(struct swim_cluster *cluster, int i,
+			      double value, bool is_for_in, bool is_for_out)
+{
+	int fd = swim_fd(swim_cluster_node(cluster, i));
+	if (value == 0) {
+		swim_test_transport_remove_filter(fd, swim_filter_drop_rate);
+		return;
+	}
+	struct swim_drop_rate *dr = swim_drop_rate_new(value, is_for_in,
+						       is_for_out);
+	swim_test_transport_add_filter(fd, swim_filter_drop_rate, free, dr);
+}
+
 void
 swim_cluster_set_drop(struct swim_cluster *cluster, int i, double value)
 {
-	swim_test_transport_set_drop(swim_fd(cluster->node[i].swim), value);
+	swim_cluster_set_drop_generic(cluster, i, value, true, true);
+}
+
+void
+swim_cluster_set_drop_out(struct swim_cluster *cluster, int i, double value)
+{
+	swim_cluster_set_drop_generic(cluster, i, value, false, true);
+}
+
+void
+swim_cluster_set_drop_in(struct swim_cluster *cluster, int i, double value)
+{
+	swim_cluster_set_drop_generic(cluster, i, value, true, false);
 }
 
 /** Check if @a s1 knows every member of @a s2's table. */
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 5e1c192a1..af428c792 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -88,9 +88,27 @@ swim_cluster_block_io(struct swim_cluster *cluster, int i);
 void
 swim_cluster_unblock_io(struct swim_cluster *cluster, int i);
 
+/**
+ * Set drop rate of incoming and outgoing packets for a node with
+ * id @a i. Note, that even if a packet is dropped on send, the
+ * node still thinks, that the packet is sent. It is not a
+ * sender-visible error.
+ */
 void
 swim_cluster_set_drop(struct swim_cluster *cluster, int i, double value);
 
+/**
+ * The same as simple drop, but applied to outgoing packets only.
+ */
+void
+swim_cluster_set_drop_out(struct swim_cluster *cluster, int i, double value);
+
+/**
+ * The same as simple drop, but applied to incoming packets only.
+ */
+void
+swim_cluster_set_drop_in(struct swim_cluster *cluster, int i, double value);
+
 /**
  * Explicitly add a member of id @a from_id to a member of id
  * @a to_id.
-- 
2.17.2 (Apple Git-113)





More information about the Tarantool-patches mailing list