[tarantool-patches] [PATCH v2 1/2] box: export mpstream methods to core
Kirill Shcherbatov
kshcherbatov at tarantool.org
Mon Aug 27 14:11:01 MSK 2018
As we going to use mpstream not only in LUA, let's move this
API in tarantool core.
Part of #3545.
---
src/CMakeLists.txt | 1 +
src/box/lua/call.c | 11 +--
src/box/lua/misc.cc | 1 +
src/box/lua/net_box.c | 127 +++++++++++++++----------------
src/box/lua/tuple.c | 23 +++---
src/lua/msgpack.c | 166 ++++------------------------------------
src/lua/msgpack.h | 102 +------------------------
src/mpstream.c | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++
src/mpstream.h | 124 ++++++++++++++++++++++++++++++
9 files changed, 429 insertions(+), 331 deletions(-)
create mode 100644 src/mpstream.c
create mode 100644 src/mpstream.h
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7c8b5cc..0407337 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -97,6 +97,7 @@ set (core_sources
http_parser.c
coll.c
coll_def.c
+ mpstream.c
)
if (TARGET_OS_NETBSD)
diff --git a/src/box/lua/call.c b/src/box/lua/call.c
index d20cbbb..1f20426 100644
--- a/src/box/lua/call.c
+++ b/src/box/lua/call.c
@@ -42,6 +42,7 @@
#include "small/obuf.h"
#include "lua_sql.h"
#include "trivia/util.h"
+#include "mpstream.h"
/**
* A helper to find a Lua function by name and put it
@@ -175,7 +176,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
* ..., { scalar }, ...`
*/
lua_pushvalue(L, i);
- luamp_encode_array(cfg, stream, 1);
+ mpstream_encode_array(stream, 1);
luamp_encode_r(L, cfg, stream, &field, 0);
lua_pop(L, 1);
} else {
@@ -202,7 +203,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
* `return scalar`
* `return map`
*/
- luamp_encode_array(cfg, stream, 1);
+ mpstream_encode_array(stream, 1);
assert(lua_gettop(L) == 1);
luamp_encode_r(L, cfg, stream, &root, 0);
return 1;
@@ -211,7 +212,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
assert(root.type == MP_ARRAY);
if (root.size == 0) {
/* `return {}` => `{ box.tuple() }` */
- luamp_encode_array(cfg, stream, 0);
+ mpstream_encode_array(stream, 0);
return 1;
}
@@ -230,7 +231,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
* `return { scalar, ... } =>
* box.tuple.new(scalar, ...)`
*/
- luamp_encode_array(cfg, stream, root.size);
+ mpstream_encode_array(stream, root.size);
/*
* Encode the first field of tuple using
* existing information from luaL_tofield
@@ -250,7 +251,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg,
* `return { tuple/array, ..., scalar, ... } =>
* { tuple/array, ..., { scalar }, ... }`
*/
- luamp_encode_array(cfg, stream, 1);
+ mpstream_encode_array(stream, 1);
luamp_encode_r(L, cfg, stream, &field, 0);
} else {
/* `return { tuple/array, ..., tuple/array, ... }` */
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index bc76065..8bd33ae 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -38,6 +38,7 @@
#include "box/box.h"
#include "box/port.h"
#include "box/lua/tuple.h"
+#include "mpstream.h"
/** {{{ Miscellaneous utils **/
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 308c9c7..a928a4c 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -47,6 +47,7 @@
#include "coio.h"
#include "box/errcode.h"
#include "lua/fiber.h"
+#include "mpstream.h"
#define cfg luaL_msgpack_default
@@ -68,13 +69,13 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type)
mpstream_advance(stream, fixheader_size);
/* encode header */
- luamp_encode_map(cfg, stream, 2);
+ mpstream_encode_map(stream, 2);
- luamp_encode_uint(cfg, stream, IPROTO_SYNC);
- luamp_encode_uint(cfg, stream, sync);
+ mpstream_encode_uint(stream, IPROTO_SYNC);
+ mpstream_encode_uint(stream, sync);
- luamp_encode_uint(cfg, stream, IPROTO_REQUEST_TYPE);
- luamp_encode_uint(cfg, stream, r_type);
+ mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE);
+ mpstream_encode_uint(stream, r_type);
/* Caller should remember how many bytes was used in ibuf */
return used;
@@ -139,16 +140,16 @@ netbox_encode_auth(lua_State *L)
return luaL_error(L, "Invalid salt");
/* Adapted from xrow_encode_auth() */
- luamp_encode_map(cfg, &stream, password != NULL ? 2 : 1);
- luamp_encode_uint(cfg, &stream, IPROTO_USER_NAME);
- luamp_encode_str(cfg, &stream, user, user_len);
+ mpstream_encode_map(&stream, password != NULL ? 2 : 1);
+ mpstream_encode_uint(&stream, IPROTO_USER_NAME);
+ mpstream_encode_strn(&stream, user, user_len);
if (password != NULL) { /* password can be omitted */
char scramble[SCRAMBLE_SIZE];
scramble_prepare(scramble, salt, password, password_len);
- luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_array(cfg, &stream, 2);
- luamp_encode_str(cfg, &stream, "chap-sha1", strlen("chap-sha1"));
- luamp_encode_str(cfg, &stream, scramble, SCRAMBLE_SIZE);
+ mpstream_encode_uint(&stream, IPROTO_TUPLE);
+ mpstream_encode_array(&stream, 2);
+ mpstream_encode_str(&stream, "chap-sha1");
+ mpstream_encode_strn(&stream, scramble, SCRAMBLE_SIZE);
}
netbox_encode_request(&stream, svp);
@@ -166,16 +167,16 @@ netbox_encode_call_impl(lua_State *L, enum iproto_type type)
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, type);
- luamp_encode_map(cfg, &stream, 2);
+ mpstream_encode_map(&stream, 2);
/* encode proc name */
size_t name_len;
const char *name = lua_tolstring(L, 3, &name_len);
- luamp_encode_uint(cfg, &stream, IPROTO_FUNCTION_NAME);
- luamp_encode_str(cfg, &stream, name, name_len);
+ mpstream_encode_uint(&stream, IPROTO_FUNCTION_NAME);
+ mpstream_encode_strn(&stream, name, name_len);
/* encode args */
- luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+ mpstream_encode_uint(&stream, IPROTO_TUPLE);
luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
@@ -205,16 +206,16 @@ netbox_encode_eval(lua_State *L)
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_EVAL);
- luamp_encode_map(cfg, &stream, 2);
+ mpstream_encode_map(&stream, 2);
/* encode expr */
size_t expr_len;
const char *expr = lua_tolstring(L, 3, &expr_len);
- luamp_encode_uint(cfg, &stream, IPROTO_EXPR);
- luamp_encode_str(cfg, &stream, expr, expr_len);
+ mpstream_encode_uint(&stream, IPROTO_EXPR);
+ mpstream_encode_strn(&stream, expr, expr_len);
/* encode args */
- luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+ mpstream_encode_uint(&stream, IPROTO_TUPLE);
luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
@@ -233,7 +234,7 @@ netbox_encode_select(lua_State *L)
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_SELECT);
- luamp_encode_map(cfg, &stream, 6);
+ mpstream_encode_map(&stream, 6);
uint32_t space_id = lua_tonumber(L, 3);
uint32_t index_id = lua_tonumber(L, 4);
@@ -242,27 +243,27 @@ netbox_encode_select(lua_State *L)
uint32_t limit = lua_tonumber(L, 7);
/* encode space_id */
- luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
- luamp_encode_uint(cfg, &stream, space_id);
+ mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+ mpstream_encode_uint(&stream, space_id);
/* encode index_id */
- luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
- luamp_encode_uint(cfg, &stream, index_id);
+ mpstream_encode_uint(&stream, IPROTO_INDEX_ID);
+ mpstream_encode_uint(&stream, index_id);
/* encode iterator */
- luamp_encode_uint(cfg, &stream, IPROTO_ITERATOR);
- luamp_encode_uint(cfg, &stream, iterator);
+ mpstream_encode_uint(&stream, IPROTO_ITERATOR);
+ mpstream_encode_uint(&stream, iterator);
/* encode offset */
- luamp_encode_uint(cfg, &stream, IPROTO_OFFSET);
- luamp_encode_uint(cfg, &stream, offset);
+ mpstream_encode_uint(&stream, IPROTO_OFFSET);
+ mpstream_encode_uint(&stream, offset);
/* encode limit */
- luamp_encode_uint(cfg, &stream, IPROTO_LIMIT);
- luamp_encode_uint(cfg, &stream, limit);
+ mpstream_encode_uint(&stream, IPROTO_LIMIT);
+ mpstream_encode_uint(&stream, limit);
/* encode key */
- luamp_encode_uint(cfg, &stream, IPROTO_KEY);
+ mpstream_encode_uint(&stream, IPROTO_KEY);
luamp_convert_key(L, cfg, &stream, 8);
netbox_encode_request(&stream, svp);
@@ -279,15 +280,15 @@ netbox_encode_insert_or_replace(lua_State *L, uint32_t reqtype)
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, reqtype);
- luamp_encode_map(cfg, &stream, 2);
+ mpstream_encode_map(&stream, 2);
/* encode space_id */
uint32_t space_id = lua_tonumber(L, 3);
- luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
- luamp_encode_uint(cfg, &stream, space_id);
+ mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+ mpstream_encode_uint(&stream, space_id);
/* encode args */
- luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+ mpstream_encode_uint(&stream, IPROTO_TUPLE);
luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
@@ -317,20 +318,20 @@ netbox_encode_delete(lua_State *L)
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_DELETE);
- luamp_encode_map(cfg, &stream, 3);
+ mpstream_encode_map(&stream, 3);
/* encode space_id */
uint32_t space_id = lua_tonumber(L, 3);
- luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
- luamp_encode_uint(cfg, &stream, space_id);
+ mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+ mpstream_encode_uint(&stream, space_id);
/* encode space_id */
uint32_t index_id = lua_tonumber(L, 4);
- luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
- luamp_encode_uint(cfg, &stream, index_id);
+ mpstream_encode_uint(&stream, IPROTO_INDEX_ID);
+ mpstream_encode_uint(&stream, index_id);
/* encode key */
- luamp_encode_uint(cfg, &stream, IPROTO_KEY);
+ mpstream_encode_uint(&stream, IPROTO_KEY);
luamp_convert_key(L, cfg, &stream, 5);
netbox_encode_request(&stream, svp);
@@ -348,30 +349,30 @@ netbox_encode_update(lua_State *L)
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPDATE);
- luamp_encode_map(cfg, &stream, 5);
+ mpstream_encode_map(&stream, 5);
/* encode space_id */
uint32_t space_id = lua_tonumber(L, 3);
- luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
- luamp_encode_uint(cfg, &stream, space_id);
+ mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+ mpstream_encode_uint(&stream, space_id);
/* encode index_id */
uint32_t index_id = lua_tonumber(L, 4);
- luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
- luamp_encode_uint(cfg, &stream, index_id);
+ mpstream_encode_uint(&stream, IPROTO_INDEX_ID);
+ mpstream_encode_uint(&stream, index_id);
/* encode index_id */
- luamp_encode_uint(cfg, &stream, IPROTO_INDEX_BASE);
- luamp_encode_uint(cfg, &stream, 1);
+ mpstream_encode_uint(&stream, IPROTO_INDEX_BASE);
+ mpstream_encode_uint(&stream, 1);
/* encode in reverse order for speedup - see luamp_encode() code */
/* encode ops */
- luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+ mpstream_encode_uint(&stream, IPROTO_TUPLE);
luamp_encode_tuple(L, cfg, &stream, 6);
lua_pop(L, 1); /* ops */
/* encode key */
- luamp_encode_uint(cfg, &stream, IPROTO_KEY);
+ mpstream_encode_uint(&stream, IPROTO_KEY);
luamp_convert_key(L, cfg, &stream, 5);
netbox_encode_request(&stream, svp);
@@ -389,25 +390,25 @@ netbox_encode_upsert(lua_State *L)
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPSERT);
- luamp_encode_map(cfg, &stream, 4);
+ mpstream_encode_map(&stream, 4);
/* encode space_id */
uint32_t space_id = lua_tonumber(L, 3);
- luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
- luamp_encode_uint(cfg, &stream, space_id);
+ mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+ mpstream_encode_uint(&stream, space_id);
/* encode index_base */
- luamp_encode_uint(cfg, &stream, IPROTO_INDEX_BASE);
- luamp_encode_uint(cfg, &stream, 1);
+ mpstream_encode_uint(&stream, IPROTO_INDEX_BASE);
+ mpstream_encode_uint(&stream, 1);
/* encode in reverse order for speedup - see luamp_encode() code */
/* encode ops */
- luamp_encode_uint(cfg, &stream, IPROTO_OPS);
+ mpstream_encode_uint(&stream, IPROTO_OPS);
luamp_encode_tuple(L, cfg, &stream, 5);
lua_pop(L, 1); /* ops */
/* encode tuple */
- luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
+ mpstream_encode_uint(&stream, IPROTO_TUPLE);
luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
@@ -566,17 +567,17 @@ netbox_encode_execute(lua_State *L)
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_EXECUTE);
- luamp_encode_map(cfg, &stream, 3);
+ mpstream_encode_map(&stream, 3);
size_t len;
const char *query = lua_tolstring(L, 3, &len);
- luamp_encode_uint(cfg, &stream, IPROTO_SQL_TEXT);
- luamp_encode_str(cfg, &stream, query, len);
+ mpstream_encode_uint(&stream, IPROTO_SQL_TEXT);
+ mpstream_encode_strn(&stream, query, len);
- luamp_encode_uint(cfg, &stream, IPROTO_SQL_BIND);
+ mpstream_encode_uint(&stream, IPROTO_SQL_BIND);
luamp_encode_tuple(L, cfg, &stream, 4);
- luamp_encode_uint(cfg, &stream, IPROTO_OPTIONS);
+ mpstream_encode_uint(&stream, IPROTO_OPTIONS);
luamp_encode_tuple(L, cfg, &stream, 5);
netbox_encode_request(&stream, svp);
diff --git a/src/box/lua/tuple.c b/src/box/lua/tuple.c
index 22fe696..126466c 100644
--- a/src/box/lua/tuple.c
+++ b/src/box/lua/tuple.c
@@ -42,6 +42,7 @@
#include "box/tuple_convert.h"
#include "box/errcode.h"
#include "json/path.h"
+#include "mpstream.h"
/** {{{ box.tuple Lua library
*
@@ -111,7 +112,7 @@ lbox_tuple_new(lua_State *L)
luamp_encode_tuple(L, luaL_msgpack_default, &stream, 1);
} else {
/* Backward-compatible format: box.tuple.new(1, 2, 3). */
- luamp_encode_array(luaL_msgpack_default, &stream, argc);
+ mpstream_encode_array(&stream, argc);
for (int k = 1; k <= argc; ++k) {
luamp_encode(L, luaL_msgpack_default, &stream, k);
}
@@ -228,9 +229,9 @@ luamp_convert_key(struct lua_State *L, struct luaL_serializer *cfg,
luamp_encode_r(L, cfg, stream, &field, 0);
lua_pop(L, 1);
} else if (field.type == MP_NIL) {
- luamp_encode_array(cfg, stream, 0);
+ mpstream_encode_array(stream, 0);
} else {
- luamp_encode_array(cfg, stream, 1);
+ mpstream_encode_array(stream, 1);
lua_pushvalue(L, index);
luamp_encode_r(L, cfg, stream, &field, 0);
lua_pop(L, 1);
@@ -393,18 +394,18 @@ lbox_tuple_transform(struct lua_State *L)
/*
* Prepare UPDATE expression
*/
- luamp_encode_array(luaL_msgpack_default, &stream, op_cnt);
+ mpstream_encode_array(&stream, op_cnt);
if (len > 0) {
- luamp_encode_array(luaL_msgpack_default, &stream, 3);
- luamp_encode_str(luaL_msgpack_default, &stream, "#", 1);
- luamp_encode_uint(luaL_msgpack_default, &stream, offset);
- luamp_encode_uint(luaL_msgpack_default, &stream, len);
+ mpstream_encode_array(&stream, 3);
+ mpstream_encode_str(&stream, "#");
+ mpstream_encode_uint(&stream, offset);
+ mpstream_encode_uint(&stream, len);
}
for (int i = argc ; i > 3; i--) {
- luamp_encode_array(luaL_msgpack_default, &stream, 3);
- luamp_encode_str(luaL_msgpack_default, &stream, "!", 1);
- luamp_encode_uint(luaL_msgpack_default, &stream, offset);
+ mpstream_encode_array(&stream, 3);
+ mpstream_encode_str(&stream, "!");
+ mpstream_encode_uint(&stream, offset);
luamp_encode(L, luaL_msgpack_default, &stream, i);
}
mpstream_flush(&stream);
diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index acd860a..b470060 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -29,7 +29,7 @@
* SUCH DAMAGE.
*/
#include "lua/msgpack.h"
-
+#include "mpstream.h"
#include "lua/utils.h"
#if defined(LUAJIT)
@@ -50,46 +50,6 @@ luamp_error(void *error_ctx)
luaL_error(L, diag_last_error(diag_get())->errmsg);
}
-void
-mpstream_init(struct mpstream *stream, void *ctx,
- luamp_reserve_f reserve, luamp_alloc_f alloc,
- luamp_error_f error, void *error_ctx)
-{
- stream->ctx = ctx;
- stream->reserve = reserve;
- stream->alloc = alloc;
- stream->error = error;
- stream->error_ctx = error_ctx;
- mpstream_reset(stream);
-}
-
-void
-mpstream_reserve_slow(struct mpstream *stream, size_t size)
-{
- stream->alloc(stream->ctx, stream->pos - stream->buf);
- stream->buf = (char *) stream->reserve(stream->ctx, &size);
- if (stream->buf == NULL) {
- diag_set(OutOfMemory, size, "mpstream", "reserve");
- stream->error(stream->error_ctx);
- }
- stream->pos = stream->buf;
- stream->end = stream->pos + size;
-}
-
-void
-mpstream_reset(struct mpstream *stream)
-{
- size_t size = 0;
- stream->buf = (char *) stream->reserve(stream->ctx, &size);
- if (stream->buf == NULL) {
- diag_set(OutOfMemory, size, "mpstream", "reset");
- stream->error(stream->error_ctx);
- }
- stream->pos = stream->buf;
- stream->end = stream->pos + size;
-}
-
-
static uint32_t CTID_CHAR_PTR;
static uint32_t CTID_STRUCT_IBUF;
@@ -107,104 +67,6 @@ static luamp_encode_extension_f luamp_encode_extension =
static luamp_decode_extension_f luamp_decode_extension =
luamp_decode_extension_default;
-void
-luamp_encode_array(struct luaL_serializer *cfg, struct mpstream *stream,
- uint32_t size)
-{
- (void) cfg;
- assert(mp_sizeof_array(size) <= 5);
- char *data = mpstream_reserve(stream, 5);
- char *pos = mp_encode_array(data, size);
- mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_map(struct luaL_serializer *cfg, struct mpstream *stream,
- uint32_t size)
-{
- (void) cfg;
- assert(mp_sizeof_map(size) <= 5);
- char *data = mpstream_reserve(stream, 5);
- char *pos = mp_encode_map(data, size);
- mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_uint(struct luaL_serializer *cfg, struct mpstream *stream,
- uint64_t num)
-{
- (void) cfg;
- assert(mp_sizeof_uint(num) <= 9);
- char *data = mpstream_reserve(stream, 9);
- char *pos = mp_encode_uint(data, num);
- mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_int(struct luaL_serializer *cfg, struct mpstream *stream,
- int64_t num)
-{
- (void) cfg;
- assert(mp_sizeof_int(num) <= 9);
- char *data = mpstream_reserve(stream, 9);
- char *pos = mp_encode_int(data, num);
- mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_float(struct luaL_serializer *cfg, struct mpstream *stream,
- float num)
-{
- (void) cfg;
- assert(mp_sizeof_float(num) <= 5);
- char *data = mpstream_reserve(stream, 5);
- char *pos = mp_encode_float(data, num);
- mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_double(struct luaL_serializer *cfg, struct mpstream *stream,
- double num)
-{
- (void) cfg;
- assert(mp_sizeof_double(num) <= 9);
- char *data = mpstream_reserve(stream, 9);
- char *pos = mp_encode_double(data, num);
- mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_str(struct luaL_serializer *cfg, struct mpstream *stream,
- const char *str, uint32_t len)
-{
- (void) cfg;
- assert(mp_sizeof_str(len) <= 5 + len);
- char *data = mpstream_reserve(stream, 5 + len);
- char *pos = mp_encode_str(data, str, len);
- mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_nil(struct luaL_serializer *cfg, struct mpstream *stream)
-{
- (void) cfg;
- assert(mp_sizeof_nil() <= 1);
- char *data = mpstream_reserve(stream, 1);
- char *pos = mp_encode_nil(data);
- mpstream_advance(stream, pos - data);
-}
-
-void
-luamp_encode_bool(struct luaL_serializer *cfg, struct mpstream *stream,
- bool val)
-{
- (void) cfg;
- assert(mp_sizeof_bool(val) <= 1);
- char *data = mpstream_reserve(stream, 1);
- char *pos = mp_encode_bool(data, val);
- mpstream_advance(stream, pos - data);
-}
-
static enum mp_type
luamp_encode_extension_default(struct lua_State *L, int idx,
struct mpstream *stream)
@@ -254,38 +116,36 @@ luamp_encode_r(struct lua_State *L, struct luaL_serializer *cfg,
restart: /* used by MP_EXT */
switch (field->type) {
case MP_UINT:
- luamp_encode_uint(cfg, stream, field->ival);
+ mpstream_encode_uint(stream, field->ival);
return MP_UINT;
case MP_STR:
- luamp_encode_str(cfg, stream, field->sval.data,
- field->sval.len);
+ mpstream_encode_strn(stream, field->sval.data, field->sval.len);
return MP_STR;
case MP_BIN:
- luamp_encode_str(cfg, stream, field->sval.data,
- field->sval.len);
+ mpstream_encode_strn(stream, field->sval.data, field->sval.len);
return MP_BIN;
case MP_INT:
- luamp_encode_int(cfg, stream, field->ival);
+ mpstream_encode_int(stream, field->ival);
return MP_INT;
case MP_FLOAT:
- luamp_encode_float(cfg, stream, field->fval);
+ mpstream_encode_float(stream, field->fval);
return MP_FLOAT;
case MP_DOUBLE:
- luamp_encode_double(cfg, stream, field->dval);
+ mpstream_encode_double(stream, field->dval);
return MP_DOUBLE;
case MP_BOOL:
- luamp_encode_bool(cfg, stream, field->bval);
+ mpstream_encode_bool(stream, field->bval);
return MP_BOOL;
case MP_NIL:
- luamp_encode_nil(cfg, stream);
+ mpstream_encode_nil(stream);
return MP_NIL;
case MP_MAP:
/* Map */
if (level >= cfg->encode_max_depth) {
- luamp_encode_nil(cfg, stream); /* Limit nested maps */
+ mpstream_encode_nil(stream); /* Limit nested maps */
return MP_NIL;
}
- luamp_encode_map(cfg, stream, field->size);
+ mpstream_encode_map(stream, field->size);
lua_pushnil(L); /* first key */
while (lua_next(L, top) != 0) {
lua_pushvalue(L, -2); /* push a copy of key to top */
@@ -301,11 +161,11 @@ restart: /* used by MP_EXT */
case MP_ARRAY:
/* Array */
if (level >= cfg->encode_max_depth) {
- luamp_encode_nil(cfg, stream); /* Limit nested arrays */
+ mpstream_encode_nil(stream); /* Limit nested arrays */
return MP_NIL;
}
uint32_t size = field->size;
- luamp_encode_array(cfg, stream, size);
+ mpstream_encode_array(stream, size);
for (uint32_t i = 0; i < size; i++) {
lua_rawgeti(L, top, i + 1);
luaL_tofield(L, cfg, top + 1, field);
diff --git a/src/lua/msgpack.h b/src/lua/msgpack.h
index bacf3e0..d0ade30 100644
--- a/src/lua/msgpack.h
+++ b/src/lua/msgpack.h
@@ -1,7 +1,7 @@
#ifndef TARANTOOL_LUA_MSGPACK_H_INCLUDED
#define TARANTOOL_LUA_MSGPACK_H_INCLUDED
/*
- * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
@@ -42,6 +42,8 @@ extern "C" {
#include <lua.h>
struct luaL_serializer;
+struct mpstream;
+
/**
* Default instance of msgpack serializer (msgpack = require('msgpack')).
* This instance is used by all box's Lua/C API bindings (e.g. space:replace()).
@@ -51,111 +53,13 @@ struct luaL_serializer;
extern struct luaL_serializer *luaL_msgpack_default;
/**
- * A streaming API so that it's possible to encode to any output
- * stream.
- */
-
-/**
- * Ask the allocator to reserve at least size bytes. It can reserve
- * more, and update *size with the new size.
- */
-typedef void *(*luamp_reserve_f)(void *ctx, size_t *size);
-
-/** Actually use the bytes. */
-typedef void *(*luamp_alloc_f)(void *ctx, size_t size);
-
-/** Actually use the bytes. */
-typedef void (*luamp_error_f)(void *error_ctx);
-
-struct mpstream {
- /**
- * When pos >= end, or required size doesn't fit in
- * pos..end range alloc() is called to advance the stream
- * and reserve() to get a new chunk.
- */
- char *buf, *pos, *end;
- void *ctx;
- luamp_reserve_f reserve;
- luamp_alloc_f alloc;
- luamp_error_f error;
- void *error_ctx;
-};
-
-/**
* luaL_error()
*/
void
luamp_error(void *);
-void
-mpstream_init(struct mpstream *stream, void *ctx,
- luamp_reserve_f reserve, luamp_alloc_f alloc,
- luamp_error_f error, void *error_ctx);
-
-void
-mpstream_reset(struct mpstream *stream);
-
-void
-mpstream_reserve_slow(struct mpstream *stream, size_t size);
-
-static inline void
-mpstream_flush(struct mpstream *stream)
-{
- stream->alloc(stream->ctx, stream->pos - stream->buf);
- stream->buf = stream->pos;
-}
-
-static inline char *
-mpstream_reserve(struct mpstream *stream, size_t size)
-{
- if (stream->pos + size > stream->end)
- mpstream_reserve_slow(stream, size);
- return stream->pos;
-}
-
-static inline void
-mpstream_advance(struct mpstream *stream, size_t size)
-{
- assert(stream->pos + size <= stream->end);
- stream->pos += size;
-}
-
enum { LUAMP_ALLOC_FACTOR = 256 };
-void
-luamp_encode_array(struct luaL_serializer *cfg, struct mpstream *stream,
- uint32_t size);
-
-void
-luamp_encode_map(struct luaL_serializer *cfg, struct mpstream *stream, uint32_t size);
-
-void
-luamp_encode_uint(struct luaL_serializer *cfg, struct mpstream *stream,
- uint64_t num);
-
-void
-luamp_encode_int(struct luaL_serializer *cfg, struct mpstream *stream,
- int64_t num);
-
-void
-luamp_encode_float(struct luaL_serializer *cfg, struct mpstream *stream,
- float num);
-
-void
-luamp_encode_double(struct luaL_serializer *cfg, struct mpstream *stream,
- double num);
-
-void
-luamp_encode_str(struct luaL_serializer *cfg, struct mpstream *stream,
- const char *str, uint32_t len);
-
-void
-luamp_encode_nil(struct luaL_serializer *cfg, struct mpstream *stream);
-
-void
-luamp_encode_bool(struct luaL_serializer *cfg, struct mpstream *stream,
- bool val);
-
/* low-level function needed for execute_lua_call() */
enum mp_type
luamp_encode_r(struct lua_State *L, struct luaL_serializer *cfg,
diff --git a/src/mpstream.c b/src/mpstream.c
new file mode 100644
index 0000000..23d27f9
--- /dev/null
+++ b/src/mpstream.c
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "mpstream.h"
+#include <assert.h>
+#include <stdint.h>
+#include "msgpuck.h"
+
+void
+mpstream_reserve_slow(struct mpstream *stream, size_t size)
+{
+ stream->alloc(stream->ctx, stream->pos - stream->buf);
+ stream->buf = (char *) stream->reserve(stream->ctx, &size);
+ if (stream->buf == NULL) {
+ diag_set(OutOfMemory, size, "mpstream", "reserve");
+ stream->error(stream->error_ctx);
+ }
+ stream->pos = stream->buf;
+ stream->end = stream->pos + size;
+}
+
+void
+mpstream_reset(struct mpstream *stream)
+{
+ size_t size = 0;
+ stream->buf = (char *) stream->reserve(stream->ctx, &size);
+ if (stream->buf == NULL) {
+ diag_set(OutOfMemory, size, "mpstream", "reset");
+ stream->error(stream->error_ctx);
+ }
+ stream->pos = stream->buf;
+ stream->end = stream->pos + size;
+}
+
+/**
+ * A streaming API so that it's possible to encode to any output
+ * stream.
+ */
+void
+mpstream_init(struct mpstream *stream, void *ctx,
+ mpstream_reserve_f reserve, mpstream_alloc_f alloc,
+ mpstream_error_f error, void *error_ctx)
+{
+ stream->ctx = ctx;
+ stream->reserve = reserve;
+ stream->alloc = alloc;
+ stream->error = error;
+ stream->error_ctx = error_ctx;
+ mpstream_reset(stream);
+}
+
+void
+mpstream_flush(struct mpstream *stream)
+{
+ stream->alloc(stream->ctx, stream->pos - stream->buf);
+ stream->buf = stream->pos;
+}
+
+char *
+mpstream_reserve(struct mpstream *stream, size_t size)
+{
+ if (stream->pos + size > stream->end)
+ mpstream_reserve_slow(stream, size);
+ return stream->pos;
+}
+
+void
+mpstream_advance(struct mpstream *stream, size_t size)
+{
+ assert(stream->pos + size <= stream->end);
+ stream->pos += size;
+}
+
+void
+mpstream_encode_array(struct mpstream *stream, uint32_t size)
+{
+ assert(mp_sizeof_array(size) <= 5);
+ char *data = mpstream_reserve(stream, 5);
+ if (data == NULL)
+ return;
+ char *pos = mp_encode_array(data, size);
+ mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_map(struct mpstream *stream, uint32_t size)
+{
+ assert(mp_sizeof_map(size) <= 5);
+ char *data = mpstream_reserve(stream, 5);
+ if (data == NULL)
+ return;
+ char *pos = mp_encode_map(data, size);
+ mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_uint(struct mpstream *stream, uint64_t num)
+{
+ assert(mp_sizeof_uint(num) <= 9);
+ char *data = mpstream_reserve(stream, 9);
+ if (data == NULL)
+ return;
+ char *pos = mp_encode_uint(data, num);
+ mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_int(struct mpstream *stream, int64_t num)
+{
+ assert(mp_sizeof_int(num) <= 9);
+ char *data = mpstream_reserve(stream, 9);
+ if (data == NULL)
+ return;
+ char *pos = mp_encode_int(data, num);
+ mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_float(struct mpstream *stream, float num)
+{
+ assert(mp_sizeof_float(num) <= 5);
+ char *data = mpstream_reserve(stream, 5);
+ if (data == NULL)
+ return;
+ char *pos = mp_encode_float(data, num);
+ mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_double(struct mpstream *stream, double num)
+{
+ assert(mp_sizeof_double(num) <= 9);
+ char *data = mpstream_reserve(stream, 9);
+ char *pos = mp_encode_double(data, num);
+ if (data == NULL)
+ return;
+ mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_strn(struct mpstream *stream, const char *str, uint32_t len)
+{
+ assert(mp_sizeof_str(len) <= 5 + len);
+ char *data = mpstream_reserve(stream, 5 + len);
+ if (data == NULL)
+ return;
+ char *pos = mp_encode_str(data, str, len);
+ mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_str(struct mpstream *stream, const char *str)
+{
+ mpstream_encode_strn(stream, str, strlen(str));
+}
+
+void
+mpstream_encode_nil(struct mpstream *stream)
+{
+ assert(mp_sizeof_nil() <= 1);
+ char *data = mpstream_reserve(stream, 1);
+ if (data == NULL)
+ return;
+ char *pos = mp_encode_nil(data);
+ mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_encode_bool(struct mpstream *stream, bool val)
+{
+ assert(mp_sizeof_bool(val) <= 1);
+ char *data = mpstream_reserve(stream, 1);
+ if (data == NULL)
+ return;
+ char *pos = mp_encode_bool(data, val);
+ mpstream_advance(stream, pos - data);
+}
diff --git a/src/mpstream.h b/src/mpstream.h
new file mode 100644
index 0000000..789bd74
--- /dev/null
+++ b/src/mpstream.h
@@ -0,0 +1,124 @@
+#ifndef TARANTOOL_LUA_MPSTREAM_H_INCLUDED
+#define TARANTOOL_LUA_MPSTREAM_H_INCLUDED
+/*
+ * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "diag.h"
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+* Ask the allocator to reserve at least size bytes. It can reserve
+* more, and update *size with the new size.
+*/
+typedef void *(*mpstream_reserve_f)(void *ctx, size_t *size);
+
+/** Actually use the bytes. */
+typedef void *(*mpstream_alloc_f)(void *ctx, size_t size);
+
+/** Actually use the bytes. */
+typedef void (*mpstream_error_f)(void *error_ctx);
+
+struct mpstream {
+ /**
+ * When pos >= end, or required size doesn't fit in
+ * pos..end range alloc() is called to advance the stream
+ * and reserve() to get a new chunk.
+ */
+ char *buf, *pos, *end;
+ void *ctx;
+ mpstream_reserve_f reserve;
+ mpstream_alloc_f alloc;
+ mpstream_error_f error;
+ void *error_ctx;
+};
+
+void
+mpstream_reserve_slow(struct mpstream *stream, size_t size);
+
+void
+mpstream_reset(struct mpstream *stream);
+
+/**
+ * A streaming API so that it's possible to encode to any output
+ * stream.
+ */
+void
+mpstream_init(struct mpstream *stream, void *ctx,
+ mpstream_reserve_f reserve, mpstream_alloc_f alloc,
+ mpstream_error_f error, void *error_ctx);
+
+void
+mpstream_flush(struct mpstream *stream);
+
+char *
+mpstream_reserve(struct mpstream *stream, size_t size);
+
+void
+mpstream_advance(struct mpstream *stream, size_t size);
+
+void
+mpstream_encode_array(struct mpstream *stream, uint32_t size);
+
+void
+mpstream_encode_map(struct mpstream *stream, uint32_t size);
+
+void
+mpstream_encode_uint(struct mpstream *stream, uint64_t num);
+
+void
+mpstream_encode_int(struct mpstream *stream, int64_t num);
+
+void
+mpstream_encode_float(struct mpstream *stream, float num);
+
+void
+mpstream_encode_double(struct mpstream *stream, double num);
+
+void
+mpstream_encode_strn(struct mpstream *stream, const char *str, uint32_t len);
+
+void
+mpstream_encode_str(struct mpstream *stream, const char *str);
+
+void
+mpstream_encode_nil(struct mpstream *stream);
+
+void
+mpstream_encode_bool(struct mpstream *stream, bool val);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_LUA_MPSTREAM_H_INCLUDED */
--
2.7.4
More information about the Tarantool-patches
mailing list