[tarantool-patches] Re: [commits] [tarantool] 02/02: iproto: allow to configure IPROTO_MSG_MAX

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue May 8 14:07:28 MSK 2018


>> -static int
>> -iproto_do_listen(struct cbus_call_msg *m)
>> +static inline int
>> +iproto_do_cfg(struct iproto_cfg_msg *msg)
>>   {
>> -	(void) m;
>> -	try {
>> -		if (evio_service_is_active(&binary))
>> -			evio_service_listen(&binary);
>> -	} catch (Exception *e) {
>> +	const struct cmsg_hop cfg_route[] = {
>> +		{ iproto_do_cfg_f, NULL },
>> +	};
>> +	cmsg_init(msg, cfg_route);
>> +	cpipe_push(&net_pipe, msg);
>> +	while (msg->rc == IPROTO_CFG_NOT_FINISHED)
>> +		fiber_sleep(0.001);
> 
> Holy cow.
> 
> Oh now I understand the comment. Please rewrite this, why
> would you ever need to write such a thing...
> Simply route the message back and wakeup a cond in the return
> function.

Done. In the same diff you can see the new fiber pool size
behavior - it is net_msg_max * FIBER_POOL_SIZE_FACTOR (5).

diff --git a/src/box/box.cc b/src/box/box.cc
index 0d1f0e6ae..412a140ce 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -764,7 +764,8 @@ box_set_net_msg_max(void)
  {
  	int new_iproto_msg_max = cfg_geti("net_msg_max");
  	iproto_set_msg_max(new_iproto_msg_max);
-	fiber_pool_set_max_size(&tx_fiber_pool, new_iproto_msg_max);
+	fiber_pool_set_max_size(&tx_fiber_pool,
+				new_iproto_msg_max * FIBER_POOL_SIZE_FACTOR);
  }
  
  /* }}} configuration bindings */
@@ -1719,7 +1720,8 @@ static inline void
  box_cfg_xc(void)
  {
  	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE_DEFAULT,
+	fiber_pool_create(&tx_fiber_pool, "tx",
+			  IPROTO_MSG_MAX_DEFAULT * FIBER_POOL_SIZE_FACTOR,
  			  FIBER_POOL_IDLE_TIMEOUT);
  	/* Add an extra endpoint for WAL wake up/rollback messages. */
  	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index edce6169c..0c975f7b4 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,11 +63,6 @@
  #include "applier.h"
  #include "cfg.h"
  
-enum {
-	IPROTO_MSG_MAX_DEFAULT = 768,
-	IPROTO_MSG_MAX_MIN = 2,
-};
-
  /**
   * Network readahead. A signed integer to avoid
   * automatic type coercion to an unsigned type.
@@ -1750,13 +1745,6 @@ iproto_init()
  	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
  }
  
-/** IProto configuration change result codes. */
-enum {
-	IPROTO_CFG_OK,
-	IPROTO_CFG_ERROR,
-	IPROTO_CFG_NOT_FINISHED,
-};
-
  /** Available IProto configuration changes. */
  enum iproto_cfg_op {
  	IPROTO_CFG_BIND,
@@ -1770,7 +1758,7 @@ enum iproto_cfg_op {
   * message count in fly it is needed to send a special message to
   * IProto thread.
   */
-struct iproto_cfg_msg: public cmsg
+struct iproto_cfg_msg: public cbus_call_msg
  {
  	/** Operation to execute in IProto thread. */
  	enum iproto_cfg_op op;
@@ -1781,29 +1769,17 @@ struct iproto_cfg_msg: public cmsg
  		/** New IProto max message count in fly. */
  		int iproto_msg_max;
  	};
-	/**
-	 * Cfg result code that can be read atomically by
-	 * different threads - TX and IProto, because it consists
-	 * of single byte.
-	 */
-	int8_t rc;
-	/**
-	 * Diag can be read by TX thread only when rc becomes
-	 * not IPROTO_CFG_NOT_FINISHED.
-	 */
-	struct diag diag;
  };
  
  static inline void
  iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op)
  {
  	memset(msg, 0, sizeof(*msg));
-	msg->rc = IPROTO_CFG_NOT_FINISHED;
  	msg->op = op;
  }
  
-static void
-iproto_do_cfg_f(struct cmsg *m)
+static int
+iproto_do_cfg_f(struct cbus_call_msg *m)
  {
  	struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m;
  	int old;
@@ -1830,29 +1806,18 @@ iproto_do_cfg_f(struct cmsg *m)
  		default:
  			unreachable();
  		}
-		cfg_msg->rc = IPROTO_CFG_OK;
  	} catch (Exception *e) {
-		diag_move(diag_get(), &cfg_msg->diag);
-		cfg_msg->rc = IPROTO_CFG_ERROR;
+		return -1;
  	}
+	return 0;
  }
  
-static inline int
+static inline void
  iproto_do_cfg(struct iproto_cfg_msg *msg)
  {
-	const struct cmsg_hop cfg_route[] = {
-		{ iproto_do_cfg_f, NULL },
-	};
-	cmsg_init(msg, cfg_route);
-	cpipe_push(&net_pipe, msg);
-	while (msg->rc == IPROTO_CFG_NOT_FINISHED)
-		fiber_sleep(0.001);
-	if (msg->rc == IPROTO_CFG_ERROR) {
-		diag_move(&msg->diag, diag_get());
-		return -1;
-	}
-	assert(msg->rc == IPROTO_CFG_OK);
-	return 0;
+	if (cbus_call(&net_pipe, &tx_pipe, msg, iproto_do_cfg_f,
+		      NULL, TIMEOUT_INFINITY) != 0)
+		diag_raise();
  }
  
  void
@@ -1861,8 +1826,7 @@ iproto_bind(const char *uri)
  	struct iproto_cfg_msg cfg_msg;
  	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_BIND);
  	cfg_msg.uri = uri;
-	if (iproto_do_cfg(&cfg_msg) != 0)
-		diag_raise();
+	iproto_do_cfg(&cfg_msg);
  }
  
  void
@@ -1870,8 +1834,7 @@ iproto_listen()
  {
  	struct iproto_cfg_msg cfg_msg;
  	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_LISTEN);
-	if (iproto_do_cfg(&cfg_msg) != 0)
-		diag_raise();
+	iproto_do_cfg(&cfg_msg);
  }
  
  size_t
@@ -1897,7 +1860,6 @@ iproto_set_msg_max(int new_iproto_msg_max)
  	struct iproto_cfg_msg cfg_msg;
  	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX);
  	cfg_msg.iproto_msg_max = new_iproto_msg_max;
-	if (iproto_do_cfg(&cfg_msg) != 0)
-		diag_raise();
+	iproto_do_cfg(&cfg_msg);
  	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
  }
diff --git a/src/box/iproto.h b/src/box/iproto.h
index a1dddc405..2ebe03c13 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -37,6 +37,11 @@
  extern "C" {
  #endif /* defined(__cplusplus) */
  
+enum {
+	IPROTO_MSG_MAX_DEFAULT = 768,
+	IPROTO_MSG_MAX_MIN = 2,
+};
+
  extern unsigned iproto_readahead;
  
  /**
diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index f60b6f0ff..b93b85a7c 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -139,8 +139,6 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events)
  void
  fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
  {
-	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
-		new_max_size = FIBER_POOL_SIZE_DEFAULT;
  	pool->max_size = new_max_size;
  }
  
diff --git a/src/fiber_pool.h b/src/fiber_pool.h
index 2a04d6063..5f7b345af 100644
--- a/src/fiber_pool.h
+++ b/src/fiber_pool.h
@@ -42,7 +42,7 @@ extern "C" {
  #endif /* defined(__cplusplus) */
  
  enum {
-	FIBER_POOL_SIZE_DEFAULT = 4096,
+	FIBER_POOL_SIZE_FACTOR = 5,
  	FIBER_POOL_IDLE_TIMEOUT = 1
  };
  
@@ -87,8 +87,7 @@ fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
  		  float idle_timeout);
  
  /**
- * Set maximal fiber pool size. If a new size is bigger than the
- * current, then pending requests processing is started.
+ * Set maximal fiber pool size.
   * @param pool Fiber pool to set size.
   * @param new_max_size New maximal size.
   */





More information about the Tarantool-patches mailing list