[tarantool-patches] [PATCH v2 1/2] box: export mpstream methods to core

Kirill Shcherbatov kshcherbatov at tarantool.org
Mon Aug 27 14:11:01 MSK 2018


As we going to use mpstream not only in LUA, let's move this
API in tarantool core.

Part of #3545.
---
 src/CMakeLists.txt    |   1 +
 src/box/lua/call.c    |  11 +--
 src/box/lua/misc.cc   |   1 +
 src/box/lua/net_box.c | 127 +++++++++++++++----------------
 src/box/lua/tuple.c   |  23 +++---
 src/lua/msgpack.c     | 166 ++++------------------------------------
 src/lua/msgpack.h     | 102 +------------------------
 src/mpstream.c        | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++
 src/mpstream.h        | 124 ++++++++++++++++++++++++++++++
 9 files changed, 429 insertions(+), 331 deletions(-)
 create mode 100644 src/mpstream.c
 create mode 100644 src/mpstream.h

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7c8b5cc..0407337 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -97,6 +97,7 @@ set (core_sources
      http_parser.c
      coll.c
      coll_def.c
+     mpstream.c
  )
 
 if (TARGET_OS_NETBSD)
diff --git a/src/box/lua/call.c b/src/box/lua/call.c
index d20cbbb..1f20426 100644
--- a/src/box/lua/call.c
+++ b/src/box/lua/call.c
@@ -42,6 +42,7 @@
 #include "small/obuf.h"
 #include "lua_sql.h"
 #include "trivia/util.h"
+#include "mpstream.h"
 
 /**
  * A helper to find a Lua function by name and put it
@@ -175,7 +176,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
 				 *         ..., { scalar }, ...`
 				 */
 				lua_pushvalue(L, i);
-				luamp_encode_array(cfg, stream, 1);
+				mpstream_encode_array(stream, 1);
 				luamp_encode_r(L, cfg, stream, &field, 0);
 				lua_pop(L, 1);
 			} else {
@@ -202,7 +203,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
 		 * `return scalar`
 		 * `return map`
 		 */
-		luamp_encode_array(cfg, stream, 1);
+		mpstream_encode_array(stream, 1);
 		assert(lua_gettop(L) == 1);
 		luamp_encode_r(L, cfg, stream, &root, 0);
 		return 1;
@@ -211,7 +212,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
 	assert(root.type == MP_ARRAY);
 	if (root.size == 0) {
 		/* `return {}` => `{ box.tuple() }` */
-		luamp_encode_array(cfg, stream, 0);
+		mpstream_encode_array(stream, 0);
 		return 1;
 	}
 
@@ -230,7 +231,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
 				 * `return { scalar, ... } =>
 				 *        box.tuple.new(scalar, ...)`
 				 */
-				luamp_encode_array(cfg, stream, root.size);
+				mpstream_encode_array(stream, root.size);
 				/*
 				 * Encode the first field of tuple using
 				 * existing information from luaL_tofield
@@ -250,7 +251,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
 			 * `return { tuple/array, ..., scalar, ... } =>
 			 *         { tuple/array, ..., { scalar }, ... }`
 			 */
-			luamp_encode_array(cfg, stream, 1);
+			mpstream_encode_array(stream, 1);
 			luamp_encode_r(L, cfg, stream, &field, 0);
 		} else {
 			/* `return { tuple/array, ..., tuple/array, ... }` */
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index bc76065..8bd33ae 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -38,6 +38,7 @@
 #include "box/box.h"
 #include "box/port.h"
 #include "box/lua/tuple.h"
+#include "mpstream.h"
 
 /** {{{ Miscellaneous utils **/
 
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 308c9c7..a928a4c 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -47,6 +47,7 @@
 #include "coio.h"
 #include "box/errcode.h"
 #include "lua/fiber.h"
+#include "mpstream.h"
 
 #define cfg luaL_msgpack_default
 
@@ -68,13 +69,13 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type)
 	mpstream_advance(stream, fixheader_size);
 
 	/* encode header */
-	luamp_encode_map(cfg, stream, 2);
+	mpstream_encode_map(stream, 2);
 
-	luamp_encode_uint(cfg, stream, IPROTO_SYNC);
-	luamp_encode_uint(cfg, stream, sync);
+	mpstream_encode_uint(stream, IPROTO_SYNC);
+	mpstream_encode_uint(stream, sync);
 
-	luamp_encode_uint(cfg, stream, IPROTO_REQUEST_TYPE);
-	luamp_encode_uint(cfg, stream, r_type);
+	mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE);
+	mpstream_encode_uint(stream, r_type);
 
 	/* Caller should remember how many bytes was used in ibuf */
 	return used;
@@ -139,16 +140,16 @@ netbox_encode_auth(lua_State *L)
 		return luaL_error(L, "Invalid salt");
 
 	/* Adapted from xrow_encode_auth() */
-	luamp_encode_map(cfg, &stream, password != NULL ? 2 : 1);
-	luamp_encode_uint(cfg, &stream, IPROTO_USER_NAME);
-	luamp_encode_str(cfg, &stream, user, user_len);
+	mpstream_encode_map(&stream, password != NULL ? 2 : 1);
+	mpstream_encode_uint(&stream, IPROTO_USER_NAME);
+	mpstream_encode_strn(&stream, user, user_len);
 	if (password != NULL) { /* password can be omitted */
 		char scramble[SCRAMBLE_SIZE];
 		scramble_prepare(scramble, salt, password, password_len);
-		luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
-		luamp_encode_array(cfg, &stream, 2);
-		luamp_encode_str(cfg, &stream, "chap-sha1", strlen("chap-sha1"));
-		luamp_encode_str(cfg, &stream, scramble, SCRAMBLE_SIZE);
+		mpstream_encode_uint(&stream, IPROTO_TUPLE);
+		mpstream_encode_array(&stream, 2);
+		mpstream_encode_str(&stream, "chap-sha1");
+		mpstream_encode_strn(&stream, scramble, SCRAMBLE_SIZE);
 	}
 
 	netbox_encode_request(&stream, svp);
@@ -166,16 +167,16 @@ netbox_encode_call_impl(lua_State *L, enum iproto_type type)
 	struct mpstream stream;
 	size_t svp = netbox_prepare_request(L, &stream, type);
 
-	luamp_encode_map(cfg, &stream, 2);
+	mpstream_encode_map(&stream, 2);
 
 	/* encode proc name */
 	size_t name_len;
 	const char *name = lua_tolstring(L, 3, &name_len);
-	luamp_encode_uint(cfg, &stream, IPROTO_FUNCTION_NAME);
-	luamp_encode_str(cfg, &stream, name, name_len);
+	mpstream_encode_uint(&stream, IPROTO_FUNCTION_NAME);
+	mpstream_encode_strn(&stream, name, name_len);
 
 	/* encode args */
-	luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+	mpstream_encode_uint(&stream, IPROTO_TUPLE);
 	luamp_encode_tuple(L, cfg, &stream, 4);
 
 	netbox_encode_request(&stream, svp);
@@ -205,16 +206,16 @@ netbox_encode_eval(lua_State *L)
 	struct mpstream stream;
 	size_t svp = netbox_prepare_request(L, &stream, IPROTO_EVAL);
 
-	luamp_encode_map(cfg, &stream, 2);
+	mpstream_encode_map(&stream, 2);
 
 	/* encode expr */
 	size_t expr_len;
 	const char *expr = lua_tolstring(L, 3, &expr_len);
-	luamp_encode_uint(cfg, &stream, IPROTO_EXPR);
-	luamp_encode_str(cfg, &stream, expr, expr_len);
+	mpstream_encode_uint(&stream, IPROTO_EXPR);
+	mpstream_encode_strn(&stream, expr, expr_len);
 
 	/* encode args */
-	luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+	mpstream_encode_uint(&stream, IPROTO_TUPLE);
 	luamp_encode_tuple(L, cfg, &stream, 4);
 
 	netbox_encode_request(&stream, svp);
@@ -233,7 +234,7 @@ netbox_encode_select(lua_State *L)
 	struct mpstream stream;
 	size_t svp = netbox_prepare_request(L, &stream, IPROTO_SELECT);
 
-	luamp_encode_map(cfg, &stream, 6);
+	mpstream_encode_map(&stream, 6);
 
 	uint32_t space_id = lua_tonumber(L, 3);
 	uint32_t index_id = lua_tonumber(L, 4);
@@ -242,27 +243,27 @@ netbox_encode_select(lua_State *L)
 	uint32_t limit = lua_tonumber(L, 7);
 
 	/* encode space_id */
-	luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
-	luamp_encode_uint(cfg, &stream, space_id);
+	mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+	mpstream_encode_uint(&stream, space_id);
 
 	/* encode index_id */
-	luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
-	luamp_encode_uint(cfg, &stream, index_id);
+	mpstream_encode_uint(&stream, IPROTO_INDEX_ID);
+	mpstream_encode_uint(&stream, index_id);
 
 	/* encode iterator */
-	luamp_encode_uint(cfg, &stream, IPROTO_ITERATOR);
-	luamp_encode_uint(cfg, &stream, iterator);
+	mpstream_encode_uint(&stream, IPROTO_ITERATOR);
+	mpstream_encode_uint(&stream, iterator);
 
 	/* encode offset */
-	luamp_encode_uint(cfg, &stream, IPROTO_OFFSET);
-	luamp_encode_uint(cfg, &stream, offset);
+	mpstream_encode_uint(&stream, IPROTO_OFFSET);
+	mpstream_encode_uint(&stream, offset);
 
 	/* encode limit */
-	luamp_encode_uint(cfg, &stream, IPROTO_LIMIT);
-	luamp_encode_uint(cfg, &stream, limit);
+	mpstream_encode_uint(&stream, IPROTO_LIMIT);
+	mpstream_encode_uint(&stream, limit);
 
 	/* encode key */
-	luamp_encode_uint(cfg, &stream, IPROTO_KEY);
+	mpstream_encode_uint(&stream, IPROTO_KEY);
 	luamp_convert_key(L, cfg, &stream, 8);
 
 	netbox_encode_request(&stream, svp);
@@ -279,15 +280,15 @@ netbox_encode_insert_or_replace(lua_State *L, uint32_t reqtype)
 	struct mpstream stream;
 	size_t svp = netbox_prepare_request(L, &stream, reqtype);
 
-	luamp_encode_map(cfg, &stream, 2);
+	mpstream_encode_map(&stream, 2);
 
 	/* encode space_id */
 	uint32_t space_id = lua_tonumber(L, 3);
-	luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
-	luamp_encode_uint(cfg, &stream, space_id);
+	mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+	mpstream_encode_uint(&stream, space_id);
 
 	/* encode args */
-	luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+	mpstream_encode_uint(&stream, IPROTO_TUPLE);
 	luamp_encode_tuple(L, cfg, &stream, 4);
 
 	netbox_encode_request(&stream, svp);
@@ -317,20 +318,20 @@ netbox_encode_delete(lua_State *L)
 	struct mpstream stream;
 	size_t svp = netbox_prepare_request(L, &stream, IPROTO_DELETE);
 
-	luamp_encode_map(cfg, &stream, 3);
+	mpstream_encode_map(&stream, 3);
 
 	/* encode space_id */
 	uint32_t space_id = lua_tonumber(L, 3);
-	luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
-	luamp_encode_uint(cfg, &stream, space_id);
+	mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+	mpstream_encode_uint(&stream, space_id);
 
 	/* encode space_id */
 	uint32_t index_id = lua_tonumber(L, 4);
-	luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
-	luamp_encode_uint(cfg, &stream, index_id);
+	mpstream_encode_uint(&stream, IPROTO_INDEX_ID);
+	mpstream_encode_uint(&stream, index_id);
 
 	/* encode key */
-	luamp_encode_uint(cfg, &stream, IPROTO_KEY);
+	mpstream_encode_uint(&stream, IPROTO_KEY);
 	luamp_convert_key(L, cfg, &stream, 5);
 
 	netbox_encode_request(&stream, svp);
@@ -348,30 +349,30 @@ netbox_encode_update(lua_State *L)
 	struct mpstream stream;
 	size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPDATE);
 
-	luamp_encode_map(cfg, &stream, 5);
+	mpstream_encode_map(&stream, 5);
 
 	/* encode space_id */
 	uint32_t space_id = lua_tonumber(L, 3);
-	luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
-	luamp_encode_uint(cfg, &stream, space_id);
+	mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+	mpstream_encode_uint(&stream, space_id);
 
 	/* encode index_id */
 	uint32_t index_id = lua_tonumber(L, 4);
-	luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
-	luamp_encode_uint(cfg, &stream, index_id);
+	mpstream_encode_uint(&stream, IPROTO_INDEX_ID);
+	mpstream_encode_uint(&stream, index_id);
 
 	/* encode index_id */
-	luamp_encode_uint(cfg, &stream, IPROTO_INDEX_BASE);
-	luamp_encode_uint(cfg, &stream, 1);
+	mpstream_encode_uint(&stream, IPROTO_INDEX_BASE);
+	mpstream_encode_uint(&stream, 1);
 
 	/* encode in reverse order for speedup - see luamp_encode() code */
 	/* encode ops */
-	luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+	mpstream_encode_uint(&stream, IPROTO_TUPLE);
 	luamp_encode_tuple(L, cfg, &stream, 6);
 	lua_pop(L, 1); /* ops */
 
 	/* encode key */
-	luamp_encode_uint(cfg, &stream, IPROTO_KEY);
+	mpstream_encode_uint(&stream, IPROTO_KEY);
 	luamp_convert_key(L, cfg, &stream, 5);
 
 	netbox_encode_request(&stream, svp);
@@ -389,25 +390,25 @@ netbox_encode_upsert(lua_State *L)
 	struct mpstream stream;
 	size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPSERT);
 
-	luamp_encode_map(cfg, &stream, 4);
+	mpstream_encode_map(&stream, 4);
 
 	/* encode space_id */
 	uint32_t space_id = lua_tonumber(L, 3);
-	luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
-	luamp_encode_uint(cfg, &stream, space_id);
+	mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+	mpstream_encode_uint(&stream, space_id);
 
 	/* encode index_base */
-	luamp_encode_uint(cfg, &stream, IPROTO_INDEX_BASE);
-	luamp_encode_uint(cfg, &stream, 1);
+	mpstream_encode_uint(&stream, IPROTO_INDEX_BASE);
+	mpstream_encode_uint(&stream, 1);
 
 	/* encode in reverse order for speedup - see luamp_encode() code */
 	/* encode ops */
-	luamp_encode_uint(cfg, &stream, IPROTO_OPS);
+	mpstream_encode_uint(&stream, IPROTO_OPS);
 	luamp_encode_tuple(L, cfg, &stream, 5);
 	lua_pop(L, 1); /* ops */
 
 	/* encode tuple */
-	luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+	mpstream_encode_uint(&stream, IPROTO_TUPLE);
 	luamp_encode_tuple(L, cfg, &stream, 4);
 
 	netbox_encode_request(&stream, svp);
@@ -566,17 +567,17 @@ netbox_encode_execute(lua_State *L)
 	struct mpstream stream;
 	size_t svp = netbox_prepare_request(L, &stream, IPROTO_EXECUTE);
 
-	luamp_encode_map(cfg, &stream, 3);
+	mpstream_encode_map(&stream, 3);
 
 	size_t len;
 	const char *query = lua_tolstring(L, 3, &len);
-	luamp_encode_uint(cfg, &stream, IPROTO_SQL_TEXT);
-	luamp_encode_str(cfg, &stream, query, len);
+	mpstream_encode_uint(&stream, IPROTO_SQL_TEXT);
+	mpstream_encode_strn(&stream, query, len);
 
-	luamp_encode_uint(cfg, &stream, IPROTO_SQL_BIND);
+	mpstream_encode_uint(&stream, IPROTO_SQL_BIND);
 	luamp_encode_tuple(L, cfg, &stream, 4);
 
-	luamp_encode_uint(cfg, &stream, IPROTO_OPTIONS);
+	mpstream_encode_uint(&stream, IPROTO_OPTIONS);
 	luamp_encode_tuple(L, cfg, &stream, 5);
 
 	netbox_encode_request(&stream, svp);
diff --git a/src/box/lua/tuple.c b/src/box/lua/tuple.c
index 22fe696..126466c 100644
--- a/src/box/lua/tuple.c
+++ b/src/box/lua/tuple.c
@@ -42,6 +42,7 @@
 #include "box/tuple_convert.h"
 #include "box/errcode.h"
 #include "json/path.h"
+#include "mpstream.h"
 
 /** {{{ box.tuple Lua library
  *
@@ -111,7 +112,7 @@ lbox_tuple_new(lua_State *L)
 		luamp_encode_tuple(L, luaL_msgpack_default, &stream, 1);
 	} else {
 		/* Backward-compatible format: box.tuple.new(1, 2, 3). */
-		luamp_encode_array(luaL_msgpack_default, &stream, argc);
+		mpstream_encode_array(&stream, argc);
 		for (int k = 1; k <= argc; ++k) {
 			luamp_encode(L, luaL_msgpack_default, &stream, k);
 		}
@@ -228,9 +229,9 @@ luamp_convert_key(struct lua_State *L, struct luaL_serializer *cfg,
 		luamp_encode_r(L, cfg, stream, &field, 0);
 		lua_pop(L, 1);
 	} else if (field.type == MP_NIL) {
-		luamp_encode_array(cfg, stream, 0);
+		mpstream_encode_array(stream, 0);
 	} else {
-		luamp_encode_array(cfg, stream, 1);
+		mpstream_encode_array(stream, 1);
 		lua_pushvalue(L, index);
 		luamp_encode_r(L, cfg, stream, &field, 0);
 		lua_pop(L, 1);
@@ -393,18 +394,18 @@ lbox_tuple_transform(struct lua_State *L)
 	/*
 	 * Prepare UPDATE expression
 	 */
-	luamp_encode_array(luaL_msgpack_default, &stream, op_cnt);
+	mpstream_encode_array(&stream, op_cnt);
 	if (len > 0) {
-		luamp_encode_array(luaL_msgpack_default, &stream, 3);
-		luamp_encode_str(luaL_msgpack_default, &stream, "#", 1);
-		luamp_encode_uint(luaL_msgpack_default, &stream, offset);
-		luamp_encode_uint(luaL_msgpack_default, &stream, len);
+		mpstream_encode_array(&stream, 3);
+		mpstream_encode_str(&stream, "#");
+		mpstream_encode_uint(&stream, offset);
+		mpstream_encode_uint(&stream, len);
 	}
 
 	for (int i = argc ; i > 3; i--) {
-		luamp_encode_array(luaL_msgpack_default, &stream, 3);
-		luamp_encode_str(luaL_msgpack_default, &stream, "!", 1);
-		luamp_encode_uint(luaL_msgpack_default, &stream, offset);
+		mpstream_encode_array(&stream, 3);
+		mpstream_encode_str(&stream, "!");
+		mpstream_encode_uint(&stream, offset);
 		luamp_encode(L, luaL_msgpack_default, &stream, i);
 	}
 	mpstream_flush(&stream);
diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index acd860a..b470060 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -29,7 +29,7 @@
  * SUCH DAMAGE.
  */
 #include "lua/msgpack.h"
-
+#include "mpstream.h"
 #include "lua/utils.h"
 
 #if defined(LUAJIT)
@@ -50,46 +50,6 @@ luamp_error(void *error_ctx)
 	luaL_error(L, diag_last_error(diag_get())->errmsg);
 }
 
-void
-mpstream_init(struct mpstream *stream, void *ctx,
-	      luamp_reserve_f reserve, luamp_alloc_f alloc,
-	      luamp_error_f error, void *error_ctx)
-{
-	stream->ctx = ctx;
-	stream->reserve = reserve;
-	stream->alloc = alloc;
-	stream->error = error;
-	stream->error_ctx = error_ctx;
-	mpstream_reset(stream);
-}
-
-void
-mpstream_reserve_slow(struct mpstream *stream, size_t size)
-{
-	stream->alloc(stream->ctx, stream->pos - stream->buf);
-	stream->buf = (char *) stream->reserve(stream->ctx, &size);
-	if (stream->buf == NULL) {
-		diag_set(OutOfMemory, size, "mpstream", "reserve");
-		stream->error(stream->error_ctx);
-	}
-	stream->pos = stream->buf;
-	stream->end = stream->pos + size;
-}
-
-void
-mpstream_reset(struct mpstream *stream)
-{
-	size_t size = 0;
-	stream->buf = (char *) stream->reserve(stream->ctx, &size);
-	if (stream->buf == NULL) {
-		diag_set(OutOfMemory, size, "mpstream", "reset");
-		stream->error(stream->error_ctx);
-	}
-	stream->pos = stream->buf;
-	stream->end = stream->pos + size;
-}
-
-
 static uint32_t CTID_CHAR_PTR;
 static uint32_t CTID_STRUCT_IBUF;
 
@@ -107,104 +67,6 @@ static luamp_encode_extension_f luamp_encode_extension =
 static luamp_decode_extension_f luamp_decode_extension =
 		luamp_decode_extension_default;
 
-void
-luamp_encode_array(struct luaL_serializer *cfg, struct mpstream *stream,
-		   uint32_t size)
-{
-	(void) cfg;
-	assert(mp_sizeof_array(size) <= 5);
-	char *data = mpstream_reserve(stream, 5);
-	char *pos = mp_encode_array(data, size);
-	mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_map(struct luaL_serializer *cfg, struct mpstream *stream,
-		 uint32_t size)
-{
-	(void) cfg;
-	assert(mp_sizeof_map(size) <= 5);
-	char *data = mpstream_reserve(stream, 5);
-	char *pos = mp_encode_map(data, size);
-	mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_uint(struct luaL_serializer *cfg, struct mpstream *stream,
-		  uint64_t num)
-{
-	(void) cfg;
-	assert(mp_sizeof_uint(num) <= 9);
-	char *data = mpstream_reserve(stream, 9);
-	char *pos = mp_encode_uint(data, num);
-	mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_int(struct luaL_serializer *cfg, struct mpstream *stream,
-		 int64_t num)
-{
-	(void) cfg;
-	assert(mp_sizeof_int(num) <= 9);
-	char *data = mpstream_reserve(stream, 9);
-	char *pos = mp_encode_int(data, num);
-	mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_float(struct luaL_serializer *cfg, struct mpstream *stream,
-		   float num)
-{
-	(void) cfg;
-	assert(mp_sizeof_float(num) <= 5);
-	char *data = mpstream_reserve(stream, 5);
-	char *pos = mp_encode_float(data, num);
-	mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_double(struct luaL_serializer *cfg, struct mpstream *stream,
-		    double num)
-{
-	(void) cfg;
-	assert(mp_sizeof_double(num) <= 9);
-	char *data = mpstream_reserve(stream, 9);
-	char *pos = mp_encode_double(data, num);
-	mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_str(struct luaL_serializer *cfg, struct mpstream *stream,
-		 const char *str, uint32_t len)
-{
-	(void) cfg;
-	assert(mp_sizeof_str(len) <= 5 + len);
-	char *data = mpstream_reserve(stream, 5 + len);
-	char *pos = mp_encode_str(data, str, len);
-	mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_nil(struct luaL_serializer *cfg, struct mpstream *stream)
-{
-	(void) cfg;
-	assert(mp_sizeof_nil() <= 1);
-	char *data = mpstream_reserve(stream, 1);
-	char *pos = mp_encode_nil(data);
-	mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_bool(struct luaL_serializer *cfg, struct mpstream *stream,
-		  bool val)
-{
-	(void) cfg;
-	assert(mp_sizeof_bool(val) <= 1);
-	char *data = mpstream_reserve(stream, 1);
-	char *pos = mp_encode_bool(data, val);
-	mpstream_advance(stream, pos - data);
-}
-
 static enum mp_type
 luamp_encode_extension_default(struct lua_State *L, int idx,
 			       struct mpstream *stream)
@@ -254,38 +116,36 @@ luamp_encode_r(struct lua_State *L, struct luaL_serializer *cfg,
 restart: /* used by MP_EXT */
 	switch (field->type) {
 	case MP_UINT:
-		luamp_encode_uint(cfg, stream, field->ival);
+		mpstream_encode_uint(stream, field->ival);
 		return MP_UINT;
 	case MP_STR:
-		luamp_encode_str(cfg, stream, field->sval.data,
-				field->sval.len);
+		mpstream_encode_strn(stream, field->sval.data, field->sval.len);
 		return MP_STR;
 	case MP_BIN:
-		luamp_encode_str(cfg, stream, field->sval.data,
-				field->sval.len);
+		mpstream_encode_strn(stream, field->sval.data, field->sval.len);
 		return MP_BIN;
 	case MP_INT:
-		luamp_encode_int(cfg, stream, field->ival);
+		mpstream_encode_int(stream, field->ival);
 		return MP_INT;
 	case MP_FLOAT:
-		luamp_encode_float(cfg, stream, field->fval);
+		mpstream_encode_float(stream, field->fval);
 		return MP_FLOAT;
 	case MP_DOUBLE:
-		luamp_encode_double(cfg, stream, field->dval);
+		mpstream_encode_double(stream, field->dval);
 		return MP_DOUBLE;
 	case MP_BOOL:
-		luamp_encode_bool(cfg, stream, field->bval);
+		mpstream_encode_bool(stream, field->bval);
 		return MP_BOOL;
 	case MP_NIL:
-		luamp_encode_nil(cfg, stream);
+		mpstream_encode_nil(stream);
 		return MP_NIL;
 	case MP_MAP:
 		/* Map */
 		if (level >= cfg->encode_max_depth) {
-			luamp_encode_nil(cfg, stream); /* Limit nested maps */
+			mpstream_encode_nil(stream); /* Limit nested maps */
 			return MP_NIL;
 		}
-		luamp_encode_map(cfg, stream, field->size);
+		mpstream_encode_map(stream, field->size);
 		lua_pushnil(L);  /* first key */
 		while (lua_next(L, top) != 0) {
 			lua_pushvalue(L, -2); /* push a copy of key to top */
@@ -301,11 +161,11 @@ restart: /* used by MP_EXT */
 	case MP_ARRAY:
 		/* Array */
 		if (level >= cfg->encode_max_depth) {
-			luamp_encode_nil(cfg, stream); /* Limit nested arrays */
+			mpstream_encode_nil(stream); /* Limit nested arrays */
 			return MP_NIL;
 		}
 		uint32_t size = field->size;
-		luamp_encode_array(cfg, stream, size);
+		mpstream_encode_array(stream, size);
 		for (uint32_t i = 0; i < size; i++) {
 			lua_rawgeti(L, top, i + 1);
 			luaL_tofield(L, cfg, top + 1, field);
diff --git a/src/lua/msgpack.h b/src/lua/msgpack.h
index bacf3e0..d0ade30 100644
--- a/src/lua/msgpack.h
+++ b/src/lua/msgpack.h
@@ -1,7 +1,7 @@
 #ifndef TARANTOOL_LUA_MSGPACK_H_INCLUDED
 #define TARANTOOL_LUA_MSGPACK_H_INCLUDED
 /*
- * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
  *
  * Redistribution and use in source and binary forms, with or
  * without modification, are permitted provided that the following
@@ -42,6 +42,8 @@ extern "C" {
 #include <lua.h>
 
 struct luaL_serializer;
+struct mpstream;
+
 /**
  * Default instance of msgpack serializer (msgpack = require('msgpack')).
  * This instance is used by all box's Lua/C API bindings (e.g. space:replace()).
@@ -51,111 +53,13 @@ struct luaL_serializer;
 extern struct luaL_serializer *luaL_msgpack_default;
 
 /**
- * A streaming API so that it's possible to encode to any output
- * stream.
- */
-
-/**
- * Ask the allocator to reserve at least size bytes. It can reserve
- * more, and update *size with the new size.
- */
-typedef	void *(*luamp_reserve_f)(void *ctx, size_t *size);
-
-/** Actually use the bytes. */
-typedef	void *(*luamp_alloc_f)(void *ctx, size_t size);
-
-/** Actually use the bytes. */
-typedef	void (*luamp_error_f)(void *error_ctx);
-
-struct mpstream {
-	/**
-	 * When pos >= end, or required size doesn't fit in
-	 * pos..end range alloc() is called to advance the stream
-	 * and reserve() to get a new chunk.
-	 */
-	char *buf, *pos, *end;
-	void *ctx;
-	luamp_reserve_f reserve;
-	luamp_alloc_f alloc;
-	luamp_error_f error;
-	void *error_ctx;
-};
-
-/**
  * luaL_error()
  */
 void
 luamp_error(void *);
 
-void
-mpstream_init(struct mpstream *stream, void *ctx,
-	      luamp_reserve_f reserve, luamp_alloc_f alloc,
-	      luamp_error_f error, void *error_ctx);
-
-void
-mpstream_reset(struct mpstream *stream);
-
-void
-mpstream_reserve_slow(struct mpstream *stream, size_t size);
-
-static inline void
-mpstream_flush(struct mpstream *stream)
-{
-	stream->alloc(stream->ctx, stream->pos - stream->buf);
-	stream->buf = stream->pos;
-}
-
-static inline char *
-mpstream_reserve(struct mpstream *stream, size_t size)
-{
-	if (stream->pos + size > stream->end)
-		mpstream_reserve_slow(stream, size);
-	return stream->pos;
-}
-
-static inline void
-mpstream_advance(struct mpstream *stream, size_t size)
-{
-	assert(stream->pos + size <= stream->end);
-	stream->pos += size;
-}
-
 enum { LUAMP_ALLOC_FACTOR = 256 };
 
-void
-luamp_encode_array(struct luaL_serializer *cfg, struct mpstream *stream,
-		   uint32_t size);
-
-void
-luamp_encode_map(struct luaL_serializer *cfg, struct mpstream *stream, uint32_t size);
-
-void
-luamp_encode_uint(struct luaL_serializer *cfg, struct mpstream *stream,
-		  uint64_t num);
-
-void
-luamp_encode_int(struct luaL_serializer *cfg, struct mpstream *stream,
-		 int64_t num);
-
-void
-luamp_encode_float(struct luaL_serializer *cfg, struct mpstream *stream,
-		   float num);
-
-void
-luamp_encode_double(struct luaL_serializer *cfg, struct mpstream *stream,
-		    double num);
-
-void
-luamp_encode_str(struct luaL_serializer *cfg, struct mpstream *stream,
-		 const char *str, uint32_t len);
-
-void
-luamp_encode_nil(struct luaL_serializer *cfg, struct mpstream *stream);
-
-void
-luamp_encode_bool(struct luaL_serializer *cfg, struct mpstream *stream,
-		  bool val);
-
 /* low-level function needed for execute_lua_call() */
 enum mp_type
 luamp_encode_r(struct lua_State *L, struct luaL_serializer *cfg,
diff --git a/src/mpstream.c b/src/mpstream.c
new file mode 100644
index 0000000..23d27f9
--- /dev/null
+++ b/src/mpstream.c
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "mpstream.h"
+#include <assert.h>
+#include <stdint.h>
+#include "msgpuck.h"
+
+void
+mpstream_reserve_slow(struct mpstream *stream, size_t size)
+{
+    stream->alloc(stream->ctx, stream->pos - stream->buf);
+    stream->buf = (char *) stream->reserve(stream->ctx, &size);
+    if (stream->buf == NULL) {
+        diag_set(OutOfMemory, size, "mpstream", "reserve");
+        stream->error(stream->error_ctx);
+    }
+    stream->pos = stream->buf;
+    stream->end = stream->pos + size;
+}
+
+void
+mpstream_reset(struct mpstream *stream)
+{
+    size_t size = 0;
+    stream->buf = (char *) stream->reserve(stream->ctx, &size);
+    if (stream->buf == NULL) {
+        diag_set(OutOfMemory, size, "mpstream", "reset");
+        stream->error(stream->error_ctx);
+    }
+    stream->pos = stream->buf;
+    stream->end = stream->pos + size;
+}
+
+/**
+ * A streaming API so that it's possible to encode to any output
+ * stream.
+ */
+void
+mpstream_init(struct mpstream *stream, void *ctx,
+              mpstream_reserve_f reserve, mpstream_alloc_f alloc,
+              mpstream_error_f error, void *error_ctx)
+{
+    stream->ctx = ctx;
+    stream->reserve = reserve;
+    stream->alloc = alloc;
+    stream->error = error;
+    stream->error_ctx = error_ctx;
+    mpstream_reset(stream);
+}
+
+void
+mpstream_flush(struct mpstream *stream)
+{
+    stream->alloc(stream->ctx, stream->pos - stream->buf);
+    stream->buf = stream->pos;
+}
+
+char *
+mpstream_reserve(struct mpstream *stream, size_t size)
+{
+    if (stream->pos + size > stream->end)
+        mpstream_reserve_slow(stream, size);
+    return stream->pos;
+}
+
+void
+mpstream_advance(struct mpstream *stream, size_t size)
+{
+    assert(stream->pos + size <= stream->end);
+    stream->pos += size;
+}
+
+void
+mpstream_encode_array(struct mpstream *stream, uint32_t size)
+{
+    assert(mp_sizeof_array(size) <= 5);
+    char *data = mpstream_reserve(stream, 5);
+    if (data == NULL)
+        return;
+    char *pos = mp_encode_array(data, size);
+    mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_map(struct mpstream *stream, uint32_t size)
+{
+    assert(mp_sizeof_map(size) <= 5);
+    char *data = mpstream_reserve(stream, 5);
+    if (data == NULL)
+        return;
+    char *pos = mp_encode_map(data, size);
+    mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_uint(struct mpstream *stream, uint64_t num)
+{
+    assert(mp_sizeof_uint(num) <= 9);
+    char *data = mpstream_reserve(stream, 9);
+    if (data == NULL)
+        return;
+    char *pos = mp_encode_uint(data, num);
+    mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_int(struct mpstream *stream, int64_t num)
+{
+    assert(mp_sizeof_int(num) <= 9);
+    char *data = mpstream_reserve(stream, 9);
+    if (data == NULL)
+        return;
+    char *pos = mp_encode_int(data, num);
+    mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_float(struct mpstream *stream, float num)
+{
+    assert(mp_sizeof_float(num) <= 5);
+    char *data = mpstream_reserve(stream, 5);
+    if (data == NULL)
+        return;
+    char *pos = mp_encode_float(data, num);
+    mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_double(struct mpstream *stream, double num)
+{
+    assert(mp_sizeof_double(num) <= 9);
+    char *data = mpstream_reserve(stream, 9);
+    char *pos = mp_encode_double(data, num);
+    if (data == NULL)
+        return;
+    mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_strn(struct mpstream *stream, const char *str, uint32_t len)
+{
+    assert(mp_sizeof_str(len) <= 5 + len);
+    char *data = mpstream_reserve(stream, 5 + len);
+    if (data == NULL)
+        return;
+    char *pos = mp_encode_str(data, str, len);
+    mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_str(struct mpstream *stream, const char *str)
+{
+    mpstream_encode_strn(stream, str, strlen(str));
+}
+
+void
+mpstream_encode_nil(struct mpstream *stream)
+{
+    assert(mp_sizeof_nil() <= 1);
+    char *data = mpstream_reserve(stream, 1);
+    if (data == NULL)
+        return;
+    char *pos = mp_encode_nil(data);
+    mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_bool(struct mpstream *stream, bool val)
+{
+    assert(mp_sizeof_bool(val) <= 1);
+    char *data = mpstream_reserve(stream, 1);
+    if (data == NULL)
+        return;
+    char *pos = mp_encode_bool(data, val);
+    mpstream_advance(stream, pos - data);
+}
diff --git a/src/mpstream.h b/src/mpstream.h
new file mode 100644
index 0000000..789bd74
--- /dev/null
+++ b/src/mpstream.h
@@ -0,0 +1,124 @@
+#ifndef TARANTOOL_LUA_MPSTREAM_H_INCLUDED
+#define TARANTOOL_LUA_MPSTREAM_H_INCLUDED
+/*
+ * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "diag.h"
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+* Ask the allocator to reserve at least size bytes. It can reserve
+* more, and update *size with the new size.
+*/
+typedef void *(*mpstream_reserve_f)(void *ctx, size_t *size);
+
+/** Actually use the bytes. */
+typedef void *(*mpstream_alloc_f)(void *ctx, size_t size);
+
+/** Actually use the bytes. */
+typedef void (*mpstream_error_f)(void *error_ctx);
+
+struct mpstream {
+    /**
+     * When pos >= end, or required size doesn't fit in
+     * pos..end range alloc() is called to advance the stream
+     * and reserve() to get a new chunk.
+     */
+    char *buf, *pos, *end;
+    void *ctx;
+    mpstream_reserve_f reserve;
+    mpstream_alloc_f alloc;
+    mpstream_error_f error;
+    void *error_ctx;
+};
+
+void
+mpstream_reserve_slow(struct mpstream *stream, size_t size);
+
+void
+mpstream_reset(struct mpstream *stream);
+
+/**
+ * A streaming API so that it's possible to encode to any output
+ * stream.
+ */
+void
+mpstream_init(struct mpstream *stream, void *ctx,
+	      mpstream_reserve_f reserve, mpstream_alloc_f alloc,
+	      mpstream_error_f error, void *error_ctx);
+
+void
+mpstream_flush(struct mpstream *stream);
+
+char *
+mpstream_reserve(struct mpstream *stream, size_t size);
+
+void
+mpstream_advance(struct mpstream *stream, size_t size);
+
+void
+mpstream_encode_array(struct mpstream *stream, uint32_t size);
+
+void
+mpstream_encode_map(struct mpstream *stream, uint32_t size);
+
+void
+mpstream_encode_uint(struct mpstream *stream, uint64_t num);
+
+void
+mpstream_encode_int(struct mpstream *stream, int64_t num);
+
+void
+mpstream_encode_float(struct mpstream *stream, float num);
+
+void
+mpstream_encode_double(struct mpstream *stream, double num);
+
+void
+mpstream_encode_strn(struct mpstream *stream, const char *str, uint32_t len);
+
+void
+mpstream_encode_str(struct mpstream *stream, const char *str);
+
+void
+mpstream_encode_nil(struct mpstream *stream);
+
+void
+mpstream_encode_bool(struct mpstream *stream, bool val);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_LUA_MPSTREAM_H_INCLUDED */
-- 
2.7.4






More information about the Tarantool-patches mailing list