From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id ACF702935C for ; Mon, 27 Aug 2018 07:11:09 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 0Vn7IqoGIPaE for ; Mon, 27 Aug 2018 07:11:09 -0400 (EDT) Received: from smtp1.mail.ru (smtp1.mail.ru [94.100.179.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 009ED28E8E for ; Mon, 27 Aug 2018 07:11:08 -0400 (EDT) From: Kirill Shcherbatov Subject: [tarantool-patches] [PATCH v2 1/2] box: export mpstream methods to core Date: Mon, 27 Aug 2018 14:11:01 +0300 Message-Id: <426c34291ce7834cf224eaed82506564c066ff8e.1535367103.git.kshcherbatov@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: v.shpilevoy@tarantool.org, Kirill Shcherbatov As we going to use mpstream not only in LUA, let's move this API in tarantool core. Part of #3545. --- src/CMakeLists.txt | 1 + src/box/lua/call.c | 11 +-- src/box/lua/misc.cc | 1 + src/box/lua/net_box.c | 127 +++++++++++++++---------------- src/box/lua/tuple.c | 23 +++--- src/lua/msgpack.c | 166 ++++------------------------------------ src/lua/msgpack.h | 102 +------------------------ src/mpstream.c | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/mpstream.h | 124 ++++++++++++++++++++++++++++++ 9 files changed, 429 insertions(+), 331 deletions(-) create mode 100644 src/mpstream.c create mode 100644 src/mpstream.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7c8b5cc..0407337 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -97,6 +97,7 @@ set (core_sources http_parser.c coll.c coll_def.c + mpstream.c ) if (TARGET_OS_NETBSD) diff --git a/src/box/lua/call.c b/src/box/lua/call.c index d20cbbb..1f20426 100644 --- a/src/box/lua/call.c +++ b/src/box/lua/call.c @@ -42,6 +42,7 @@ #include "small/obuf.h" #include "lua_sql.h" #include "trivia/util.h" +#include "mpstream.h" /** * A helper to find a Lua function by name and put it @@ -175,7 +176,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg, * ..., { scalar }, ...` */ lua_pushvalue(L, i); - luamp_encode_array(cfg, stream, 1); + mpstream_encode_array(stream, 1); luamp_encode_r(L, cfg, stream, &field, 0); lua_pop(L, 1); } else { @@ -202,7 +203,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg, * `return scalar` * `return map` */ - luamp_encode_array(cfg, stream, 1); + mpstream_encode_array(stream, 1); assert(lua_gettop(L) == 1); luamp_encode_r(L, cfg, stream, &root, 0); return 1; @@ -211,7 +212,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg, assert(root.type == MP_ARRAY); if (root.size == 0) { /* `return {}` => `{ box.tuple() }` */ - luamp_encode_array(cfg, stream, 0); + mpstream_encode_array(stream, 0); return 1; } @@ -230,7 +231,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg, * `return { scalar, ... } => * box.tuple.new(scalar, ...)` */ - luamp_encode_array(cfg, stream, root.size); + mpstream_encode_array(stream, root.size); /* * Encode the first field of tuple using * existing information from luaL_tofield @@ -250,7 +251,7 @@ luamp_encode_call_16(lua_State *L, struct luaL_serializer *cfg, * `return { tuple/array, ..., scalar, ... } => * { tuple/array, ..., { scalar }, ... }` */ - luamp_encode_array(cfg, stream, 1); + mpstream_encode_array(stream, 1); luamp_encode_r(L, cfg, stream, &field, 0); } else { /* `return { tuple/array, ..., tuple/array, ... }` */ diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc index bc76065..8bd33ae 100644 --- a/src/box/lua/misc.cc +++ b/src/box/lua/misc.cc @@ -38,6 +38,7 @@ #include "box/box.h" #include "box/port.h" #include "box/lua/tuple.h" +#include "mpstream.h" /** {{{ Miscellaneous utils **/ diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index 308c9c7..a928a4c 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -47,6 +47,7 @@ #include "coio.h" #include "box/errcode.h" #include "lua/fiber.h" +#include "mpstream.h" #define cfg luaL_msgpack_default @@ -68,13 +69,13 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type) mpstream_advance(stream, fixheader_size); /* encode header */ - luamp_encode_map(cfg, stream, 2); + mpstream_encode_map(stream, 2); - luamp_encode_uint(cfg, stream, IPROTO_SYNC); - luamp_encode_uint(cfg, stream, sync); + mpstream_encode_uint(stream, IPROTO_SYNC); + mpstream_encode_uint(stream, sync); - luamp_encode_uint(cfg, stream, IPROTO_REQUEST_TYPE); - luamp_encode_uint(cfg, stream, r_type); + mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE); + mpstream_encode_uint(stream, r_type); /* Caller should remember how many bytes was used in ibuf */ return used; @@ -139,16 +140,16 @@ netbox_encode_auth(lua_State *L) return luaL_error(L, "Invalid salt"); /* Adapted from xrow_encode_auth() */ - luamp_encode_map(cfg, &stream, password != NULL ? 2 : 1); - luamp_encode_uint(cfg, &stream, IPROTO_USER_NAME); - luamp_encode_str(cfg, &stream, user, user_len); + mpstream_encode_map(&stream, password != NULL ? 2 : 1); + mpstream_encode_uint(&stream, IPROTO_USER_NAME); + mpstream_encode_strn(&stream, user, user_len); if (password != NULL) { /* password can be omitted */ char scramble[SCRAMBLE_SIZE]; scramble_prepare(scramble, salt, password, password_len); - luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); - luamp_encode_array(cfg, &stream, 2); - luamp_encode_str(cfg, &stream, "chap-sha1", strlen("chap-sha1")); - luamp_encode_str(cfg, &stream, scramble, SCRAMBLE_SIZE); + mpstream_encode_uint(&stream, IPROTO_TUPLE); + mpstream_encode_array(&stream, 2); + mpstream_encode_str(&stream, "chap-sha1"); + mpstream_encode_strn(&stream, scramble, SCRAMBLE_SIZE); } netbox_encode_request(&stream, svp); @@ -166,16 +167,16 @@ netbox_encode_call_impl(lua_State *L, enum iproto_type type) struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, type); - luamp_encode_map(cfg, &stream, 2); + mpstream_encode_map(&stream, 2); /* encode proc name */ size_t name_len; const char *name = lua_tolstring(L, 3, &name_len); - luamp_encode_uint(cfg, &stream, IPROTO_FUNCTION_NAME); - luamp_encode_str(cfg, &stream, name, name_len); + mpstream_encode_uint(&stream, IPROTO_FUNCTION_NAME); + mpstream_encode_strn(&stream, name, name_len); /* encode args */ - luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); + mpstream_encode_uint(&stream, IPROTO_TUPLE); luamp_encode_tuple(L, cfg, &stream, 4); netbox_encode_request(&stream, svp); @@ -205,16 +206,16 @@ netbox_encode_eval(lua_State *L) struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_EVAL); - luamp_encode_map(cfg, &stream, 2); + mpstream_encode_map(&stream, 2); /* encode expr */ size_t expr_len; const char *expr = lua_tolstring(L, 3, &expr_len); - luamp_encode_uint(cfg, &stream, IPROTO_EXPR); - luamp_encode_str(cfg, &stream, expr, expr_len); + mpstream_encode_uint(&stream, IPROTO_EXPR); + mpstream_encode_strn(&stream, expr, expr_len); /* encode args */ - luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); + mpstream_encode_uint(&stream, IPROTO_TUPLE); luamp_encode_tuple(L, cfg, &stream, 4); netbox_encode_request(&stream, svp); @@ -233,7 +234,7 @@ netbox_encode_select(lua_State *L) struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_SELECT); - luamp_encode_map(cfg, &stream, 6); + mpstream_encode_map(&stream, 6); uint32_t space_id = lua_tonumber(L, 3); uint32_t index_id = lua_tonumber(L, 4); @@ -242,27 +243,27 @@ netbox_encode_select(lua_State *L) uint32_t limit = lua_tonumber(L, 7); /* encode space_id */ - luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); - luamp_encode_uint(cfg, &stream, space_id); + mpstream_encode_uint(&stream, IPROTO_SPACE_ID); + mpstream_encode_uint(&stream, space_id); /* encode index_id */ - luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID); - luamp_encode_uint(cfg, &stream, index_id); + mpstream_encode_uint(&stream, IPROTO_INDEX_ID); + mpstream_encode_uint(&stream, index_id); /* encode iterator */ - luamp_encode_uint(cfg, &stream, IPROTO_ITERATOR); - luamp_encode_uint(cfg, &stream, iterator); + mpstream_encode_uint(&stream, IPROTO_ITERATOR); + mpstream_encode_uint(&stream, iterator); /* encode offset */ - luamp_encode_uint(cfg, &stream, IPROTO_OFFSET); - luamp_encode_uint(cfg, &stream, offset); + mpstream_encode_uint(&stream, IPROTO_OFFSET); + mpstream_encode_uint(&stream, offset); /* encode limit */ - luamp_encode_uint(cfg, &stream, IPROTO_LIMIT); - luamp_encode_uint(cfg, &stream, limit); + mpstream_encode_uint(&stream, IPROTO_LIMIT); + mpstream_encode_uint(&stream, limit); /* encode key */ - luamp_encode_uint(cfg, &stream, IPROTO_KEY); + mpstream_encode_uint(&stream, IPROTO_KEY); luamp_convert_key(L, cfg, &stream, 8); netbox_encode_request(&stream, svp); @@ -279,15 +280,15 @@ netbox_encode_insert_or_replace(lua_State *L, uint32_t reqtype) struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, reqtype); - luamp_encode_map(cfg, &stream, 2); + mpstream_encode_map(&stream, 2); /* encode space_id */ uint32_t space_id = lua_tonumber(L, 3); - luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); - luamp_encode_uint(cfg, &stream, space_id); + mpstream_encode_uint(&stream, IPROTO_SPACE_ID); + mpstream_encode_uint(&stream, space_id); /* encode args */ - luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); + mpstream_encode_uint(&stream, IPROTO_TUPLE); luamp_encode_tuple(L, cfg, &stream, 4); netbox_encode_request(&stream, svp); @@ -317,20 +318,20 @@ netbox_encode_delete(lua_State *L) struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_DELETE); - luamp_encode_map(cfg, &stream, 3); + mpstream_encode_map(&stream, 3); /* encode space_id */ uint32_t space_id = lua_tonumber(L, 3); - luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); - luamp_encode_uint(cfg, &stream, space_id); + mpstream_encode_uint(&stream, IPROTO_SPACE_ID); + mpstream_encode_uint(&stream, space_id); /* encode space_id */ uint32_t index_id = lua_tonumber(L, 4); - luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID); - luamp_encode_uint(cfg, &stream, index_id); + mpstream_encode_uint(&stream, IPROTO_INDEX_ID); + mpstream_encode_uint(&stream, index_id); /* encode key */ - luamp_encode_uint(cfg, &stream, IPROTO_KEY); + mpstream_encode_uint(&stream, IPROTO_KEY); luamp_convert_key(L, cfg, &stream, 5); netbox_encode_request(&stream, svp); @@ -348,30 +349,30 @@ netbox_encode_update(lua_State *L) struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPDATE); - luamp_encode_map(cfg, &stream, 5); + mpstream_encode_map(&stream, 5); /* encode space_id */ uint32_t space_id = lua_tonumber(L, 3); - luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); - luamp_encode_uint(cfg, &stream, space_id); + mpstream_encode_uint(&stream, IPROTO_SPACE_ID); + mpstream_encode_uint(&stream, space_id); /* encode index_id */ uint32_t index_id = lua_tonumber(L, 4); - luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID); - luamp_encode_uint(cfg, &stream, index_id); + mpstream_encode_uint(&stream, IPROTO_INDEX_ID); + mpstream_encode_uint(&stream, index_id); /* encode index_id */ - luamp_encode_uint(cfg, &stream, IPROTO_INDEX_BASE); - luamp_encode_uint(cfg, &stream, 1); + mpstream_encode_uint(&stream, IPROTO_INDEX_BASE); + mpstream_encode_uint(&stream, 1); /* encode in reverse order for speedup - see luamp_encode() code */ /* encode ops */ - luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); + mpstream_encode_uint(&stream, IPROTO_TUPLE); luamp_encode_tuple(L, cfg, &stream, 6); lua_pop(L, 1); /* ops */ /* encode key */ - luamp_encode_uint(cfg, &stream, IPROTO_KEY); + mpstream_encode_uint(&stream, IPROTO_KEY); luamp_convert_key(L, cfg, &stream, 5); netbox_encode_request(&stream, svp); @@ -389,25 +390,25 @@ netbox_encode_upsert(lua_State *L) struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPSERT); - luamp_encode_map(cfg, &stream, 4); + mpstream_encode_map(&stream, 4); /* encode space_id */ uint32_t space_id = lua_tonumber(L, 3); - luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); - luamp_encode_uint(cfg, &stream, space_id); + mpstream_encode_uint(&stream, IPROTO_SPACE_ID); + mpstream_encode_uint(&stream, space_id); /* encode index_base */ - luamp_encode_uint(cfg, &stream, IPROTO_INDEX_BASE); - luamp_encode_uint(cfg, &stream, 1); + mpstream_encode_uint(&stream, IPROTO_INDEX_BASE); + mpstream_encode_uint(&stream, 1); /* encode in reverse order for speedup - see luamp_encode() code */ /* encode ops */ - luamp_encode_uint(cfg, &stream, IPROTO_OPS); + mpstream_encode_uint(&stream, IPROTO_OPS); luamp_encode_tuple(L, cfg, &stream, 5); lua_pop(L, 1); /* ops */ /* encode tuple */ - luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); + mpstream_encode_uint(&stream, IPROTO_TUPLE); luamp_encode_tuple(L, cfg, &stream, 4); netbox_encode_request(&stream, svp); @@ -566,17 +567,17 @@ netbox_encode_execute(lua_State *L) struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_EXECUTE); - luamp_encode_map(cfg, &stream, 3); + mpstream_encode_map(&stream, 3); size_t len; const char *query = lua_tolstring(L, 3, &len); - luamp_encode_uint(cfg, &stream, IPROTO_SQL_TEXT); - luamp_encode_str(cfg, &stream, query, len); + mpstream_encode_uint(&stream, IPROTO_SQL_TEXT); + mpstream_encode_strn(&stream, query, len); - luamp_encode_uint(cfg, &stream, IPROTO_SQL_BIND); + mpstream_encode_uint(&stream, IPROTO_SQL_BIND); luamp_encode_tuple(L, cfg, &stream, 4); - luamp_encode_uint(cfg, &stream, IPROTO_OPTIONS); + mpstream_encode_uint(&stream, IPROTO_OPTIONS); luamp_encode_tuple(L, cfg, &stream, 5); netbox_encode_request(&stream, svp); diff --git a/src/box/lua/tuple.c b/src/box/lua/tuple.c index 22fe696..126466c 100644 --- a/src/box/lua/tuple.c +++ b/src/box/lua/tuple.c @@ -42,6 +42,7 @@ #include "box/tuple_convert.h" #include "box/errcode.h" #include "json/path.h" +#include "mpstream.h" /** {{{ box.tuple Lua library * @@ -111,7 +112,7 @@ lbox_tuple_new(lua_State *L) luamp_encode_tuple(L, luaL_msgpack_default, &stream, 1); } else { /* Backward-compatible format: box.tuple.new(1, 2, 3). */ - luamp_encode_array(luaL_msgpack_default, &stream, argc); + mpstream_encode_array(&stream, argc); for (int k = 1; k <= argc; ++k) { luamp_encode(L, luaL_msgpack_default, &stream, k); } @@ -228,9 +229,9 @@ luamp_convert_key(struct lua_State *L, struct luaL_serializer *cfg, luamp_encode_r(L, cfg, stream, &field, 0); lua_pop(L, 1); } else if (field.type == MP_NIL) { - luamp_encode_array(cfg, stream, 0); + mpstream_encode_array(stream, 0); } else { - luamp_encode_array(cfg, stream, 1); + mpstream_encode_array(stream, 1); lua_pushvalue(L, index); luamp_encode_r(L, cfg, stream, &field, 0); lua_pop(L, 1); @@ -393,18 +394,18 @@ lbox_tuple_transform(struct lua_State *L) /* * Prepare UPDATE expression */ - luamp_encode_array(luaL_msgpack_default, &stream, op_cnt); + mpstream_encode_array(&stream, op_cnt); if (len > 0) { - luamp_encode_array(luaL_msgpack_default, &stream, 3); - luamp_encode_str(luaL_msgpack_default, &stream, "#", 1); - luamp_encode_uint(luaL_msgpack_default, &stream, offset); - luamp_encode_uint(luaL_msgpack_default, &stream, len); + mpstream_encode_array(&stream, 3); + mpstream_encode_str(&stream, "#"); + mpstream_encode_uint(&stream, offset); + mpstream_encode_uint(&stream, len); } for (int i = argc ; i > 3; i--) { - luamp_encode_array(luaL_msgpack_default, &stream, 3); - luamp_encode_str(luaL_msgpack_default, &stream, "!", 1); - luamp_encode_uint(luaL_msgpack_default, &stream, offset); + mpstream_encode_array(&stream, 3); + mpstream_encode_str(&stream, "!"); + mpstream_encode_uint(&stream, offset); luamp_encode(L, luaL_msgpack_default, &stream, i); } mpstream_flush(&stream); diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c index acd860a..b470060 100644 --- a/src/lua/msgpack.c +++ b/src/lua/msgpack.c @@ -29,7 +29,7 @@ * SUCH DAMAGE. */ #include "lua/msgpack.h" - +#include "mpstream.h" #include "lua/utils.h" #if defined(LUAJIT) @@ -50,46 +50,6 @@ luamp_error(void *error_ctx) luaL_error(L, diag_last_error(diag_get())->errmsg); } -void -mpstream_init(struct mpstream *stream, void *ctx, - luamp_reserve_f reserve, luamp_alloc_f alloc, - luamp_error_f error, void *error_ctx) -{ - stream->ctx = ctx; - stream->reserve = reserve; - stream->alloc = alloc; - stream->error = error; - stream->error_ctx = error_ctx; - mpstream_reset(stream); -} - -void -mpstream_reserve_slow(struct mpstream *stream, size_t size) -{ - stream->alloc(stream->ctx, stream->pos - stream->buf); - stream->buf = (char *) stream->reserve(stream->ctx, &size); - if (stream->buf == NULL) { - diag_set(OutOfMemory, size, "mpstream", "reserve"); - stream->error(stream->error_ctx); - } - stream->pos = stream->buf; - stream->end = stream->pos + size; -} - -void -mpstream_reset(struct mpstream *stream) -{ - size_t size = 0; - stream->buf = (char *) stream->reserve(stream->ctx, &size); - if (stream->buf == NULL) { - diag_set(OutOfMemory, size, "mpstream", "reset"); - stream->error(stream->error_ctx); - } - stream->pos = stream->buf; - stream->end = stream->pos + size; -} - - static uint32_t CTID_CHAR_PTR; static uint32_t CTID_STRUCT_IBUF; @@ -107,104 +67,6 @@ static luamp_encode_extension_f luamp_encode_extension = static luamp_decode_extension_f luamp_decode_extension = luamp_decode_extension_default; -void -luamp_encode_array(struct luaL_serializer *cfg, struct mpstream *stream, - uint32_t size) -{ - (void) cfg; - assert(mp_sizeof_array(size) <= 5); - char *data = mpstream_reserve(stream, 5); - char *pos = mp_encode_array(data, size); - mpstream_advance(stream, pos - data); -} - -void -luamp_encode_map(struct luaL_serializer *cfg, struct mpstream *stream, - uint32_t size) -{ - (void) cfg; - assert(mp_sizeof_map(size) <= 5); - char *data = mpstream_reserve(stream, 5); - char *pos = mp_encode_map(data, size); - mpstream_advance(stream, pos - data); -} - -void -luamp_encode_uint(struct luaL_serializer *cfg, struct mpstream *stream, - uint64_t num) -{ - (void) cfg; - assert(mp_sizeof_uint(num) <= 9); - char *data = mpstream_reserve(stream, 9); - char *pos = mp_encode_uint(data, num); - mpstream_advance(stream, pos - data); -} - -void -luamp_encode_int(struct luaL_serializer *cfg, struct mpstream *stream, - int64_t num) -{ - (void) cfg; - assert(mp_sizeof_int(num) <= 9); - char *data = mpstream_reserve(stream, 9); - char *pos = mp_encode_int(data, num); - mpstream_advance(stream, pos - data); -} - -void -luamp_encode_float(struct luaL_serializer *cfg, struct mpstream *stream, - float num) -{ - (void) cfg; - assert(mp_sizeof_float(num) <= 5); - char *data = mpstream_reserve(stream, 5); - char *pos = mp_encode_float(data, num); - mpstream_advance(stream, pos - data); -} - -void -luamp_encode_double(struct luaL_serializer *cfg, struct mpstream *stream, - double num) -{ - (void) cfg; - assert(mp_sizeof_double(num) <= 9); - char *data = mpstream_reserve(stream, 9); - char *pos = mp_encode_double(data, num); - mpstream_advance(stream, pos - data); -} - -void -luamp_encode_str(struct luaL_serializer *cfg, struct mpstream *stream, - const char *str, uint32_t len) -{ - (void) cfg; - assert(mp_sizeof_str(len) <= 5 + len); - char *data = mpstream_reserve(stream, 5 + len); - char *pos = mp_encode_str(data, str, len); - mpstream_advance(stream, pos - data); -} - -void -luamp_encode_nil(struct luaL_serializer *cfg, struct mpstream *stream) -{ - (void) cfg; - assert(mp_sizeof_nil() <= 1); - char *data = mpstream_reserve(stream, 1); - char *pos = mp_encode_nil(data); - mpstream_advance(stream, pos - data); -} - -void -luamp_encode_bool(struct luaL_serializer *cfg, struct mpstream *stream, - bool val) -{ - (void) cfg; - assert(mp_sizeof_bool(val) <= 1); - char *data = mpstream_reserve(stream, 1); - char *pos = mp_encode_bool(data, val); - mpstream_advance(stream, pos - data); -} - static enum mp_type luamp_encode_extension_default(struct lua_State *L, int idx, struct mpstream *stream) @@ -254,38 +116,36 @@ luamp_encode_r(struct lua_State *L, struct luaL_serializer *cfg, restart: /* used by MP_EXT */ switch (field->type) { case MP_UINT: - luamp_encode_uint(cfg, stream, field->ival); + mpstream_encode_uint(stream, field->ival); return MP_UINT; case MP_STR: - luamp_encode_str(cfg, stream, field->sval.data, - field->sval.len); + mpstream_encode_strn(stream, field->sval.data, field->sval.len); return MP_STR; case MP_BIN: - luamp_encode_str(cfg, stream, field->sval.data, - field->sval.len); + mpstream_encode_strn(stream, field->sval.data, field->sval.len); return MP_BIN; case MP_INT: - luamp_encode_int(cfg, stream, field->ival); + mpstream_encode_int(stream, field->ival); return MP_INT; case MP_FLOAT: - luamp_encode_float(cfg, stream, field->fval); + mpstream_encode_float(stream, field->fval); return MP_FLOAT; case MP_DOUBLE: - luamp_encode_double(cfg, stream, field->dval); + mpstream_encode_double(stream, field->dval); return MP_DOUBLE; case MP_BOOL: - luamp_encode_bool(cfg, stream, field->bval); + mpstream_encode_bool(stream, field->bval); return MP_BOOL; case MP_NIL: - luamp_encode_nil(cfg, stream); + mpstream_encode_nil(stream); return MP_NIL; case MP_MAP: /* Map */ if (level >= cfg->encode_max_depth) { - luamp_encode_nil(cfg, stream); /* Limit nested maps */ + mpstream_encode_nil(stream); /* Limit nested maps */ return MP_NIL; } - luamp_encode_map(cfg, stream, field->size); + mpstream_encode_map(stream, field->size); lua_pushnil(L); /* first key */ while (lua_next(L, top) != 0) { lua_pushvalue(L, -2); /* push a copy of key to top */ @@ -301,11 +161,11 @@ restart: /* used by MP_EXT */ case MP_ARRAY: /* Array */ if (level >= cfg->encode_max_depth) { - luamp_encode_nil(cfg, stream); /* Limit nested arrays */ + mpstream_encode_nil(stream); /* Limit nested arrays */ return MP_NIL; } uint32_t size = field->size; - luamp_encode_array(cfg, stream, size); + mpstream_encode_array(stream, size); for (uint32_t i = 0; i < size; i++) { lua_rawgeti(L, top, i + 1); luaL_tofield(L, cfg, top + 1, field); diff --git a/src/lua/msgpack.h b/src/lua/msgpack.h index bacf3e0..d0ade30 100644 --- a/src/lua/msgpack.h +++ b/src/lua/msgpack.h @@ -1,7 +1,7 @@ #ifndef TARANTOOL_LUA_MSGPACK_H_INCLUDED #define TARANTOOL_LUA_MSGPACK_H_INCLUDED /* - * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file. + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following @@ -42,6 +42,8 @@ extern "C" { #include struct luaL_serializer; +struct mpstream; + /** * Default instance of msgpack serializer (msgpack = require('msgpack')). * This instance is used by all box's Lua/C API bindings (e.g. space:replace()). @@ -51,111 +53,13 @@ struct luaL_serializer; extern struct luaL_serializer *luaL_msgpack_default; /** - * A streaming API so that it's possible to encode to any output - * stream. - */ - -/** - * Ask the allocator to reserve at least size bytes. It can reserve - * more, and update *size with the new size. - */ -typedef void *(*luamp_reserve_f)(void *ctx, size_t *size); - -/** Actually use the bytes. */ -typedef void *(*luamp_alloc_f)(void *ctx, size_t size); - -/** Actually use the bytes. */ -typedef void (*luamp_error_f)(void *error_ctx); - -struct mpstream { - /** - * When pos >= end, or required size doesn't fit in - * pos..end range alloc() is called to advance the stream - * and reserve() to get a new chunk. - */ - char *buf, *pos, *end; - void *ctx; - luamp_reserve_f reserve; - luamp_alloc_f alloc; - luamp_error_f error; - void *error_ctx; -}; - -/** * luaL_error() */ void luamp_error(void *); -void -mpstream_init(struct mpstream *stream, void *ctx, - luamp_reserve_f reserve, luamp_alloc_f alloc, - luamp_error_f error, void *error_ctx); - -void -mpstream_reset(struct mpstream *stream); - -void -mpstream_reserve_slow(struct mpstream *stream, size_t size); - -static inline void -mpstream_flush(struct mpstream *stream) -{ - stream->alloc(stream->ctx, stream->pos - stream->buf); - stream->buf = stream->pos; -} - -static inline char * -mpstream_reserve(struct mpstream *stream, size_t size) -{ - if (stream->pos + size > stream->end) - mpstream_reserve_slow(stream, size); - return stream->pos; -} - -static inline void -mpstream_advance(struct mpstream *stream, size_t size) -{ - assert(stream->pos + size <= stream->end); - stream->pos += size; -} - enum { LUAMP_ALLOC_FACTOR = 256 }; -void -luamp_encode_array(struct luaL_serializer *cfg, struct mpstream *stream, - uint32_t size); - -void -luamp_encode_map(struct luaL_serializer *cfg, struct mpstream *stream, uint32_t size); - -void -luamp_encode_uint(struct luaL_serializer *cfg, struct mpstream *stream, - uint64_t num); - -void -luamp_encode_int(struct luaL_serializer *cfg, struct mpstream *stream, - int64_t num); - -void -luamp_encode_float(struct luaL_serializer *cfg, struct mpstream *stream, - float num); - -void -luamp_encode_double(struct luaL_serializer *cfg, struct mpstream *stream, - double num); - -void -luamp_encode_str(struct luaL_serializer *cfg, struct mpstream *stream, - const char *str, uint32_t len); - -void -luamp_encode_nil(struct luaL_serializer *cfg, struct mpstream *stream); - -void -luamp_encode_bool(struct luaL_serializer *cfg, struct mpstream *stream, - bool val); - /* low-level function needed for execute_lua_call() */ enum mp_type luamp_encode_r(struct lua_State *L, struct luaL_serializer *cfg, diff --git a/src/mpstream.c b/src/mpstream.c new file mode 100644 index 0000000..23d27f9 --- /dev/null +++ b/src/mpstream.c @@ -0,0 +1,205 @@ +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "mpstream.h" +#include +#include +#include "msgpuck.h" + +void +mpstream_reserve_slow(struct mpstream *stream, size_t size) +{ + stream->alloc(stream->ctx, stream->pos - stream->buf); + stream->buf = (char *) stream->reserve(stream->ctx, &size); + if (stream->buf == NULL) { + diag_set(OutOfMemory, size, "mpstream", "reserve"); + stream->error(stream->error_ctx); + } + stream->pos = stream->buf; + stream->end = stream->pos + size; +} + +void +mpstream_reset(struct mpstream *stream) +{ + size_t size = 0; + stream->buf = (char *) stream->reserve(stream->ctx, &size); + if (stream->buf == NULL) { + diag_set(OutOfMemory, size, "mpstream", "reset"); + stream->error(stream->error_ctx); + } + stream->pos = stream->buf; + stream->end = stream->pos + size; +} + +/** + * A streaming API so that it's possible to encode to any output + * stream. + */ +void +mpstream_init(struct mpstream *stream, void *ctx, + mpstream_reserve_f reserve, mpstream_alloc_f alloc, + mpstream_error_f error, void *error_ctx) +{ + stream->ctx = ctx; + stream->reserve = reserve; + stream->alloc = alloc; + stream->error = error; + stream->error_ctx = error_ctx; + mpstream_reset(stream); +} + +void +mpstream_flush(struct mpstream *stream) +{ + stream->alloc(stream->ctx, stream->pos - stream->buf); + stream->buf = stream->pos; +} + +char * +mpstream_reserve(struct mpstream *stream, size_t size) +{ + if (stream->pos + size > stream->end) + mpstream_reserve_slow(stream, size); + return stream->pos; +} + +void +mpstream_advance(struct mpstream *stream, size_t size) +{ + assert(stream->pos + size <= stream->end); + stream->pos += size; +} + +void +mpstream_encode_array(struct mpstream *stream, uint32_t size) +{ + assert(mp_sizeof_array(size) <= 5); + char *data = mpstream_reserve(stream, 5); + if (data == NULL) + return; + char *pos = mp_encode_array(data, size); + mpstream_advance(stream, pos - data); +} + +void +mpstream_encode_map(struct mpstream *stream, uint32_t size) +{ + assert(mp_sizeof_map(size) <= 5); + char *data = mpstream_reserve(stream, 5); + if (data == NULL) + return; + char *pos = mp_encode_map(data, size); + mpstream_advance(stream, pos - data); +} + +void +mpstream_encode_uint(struct mpstream *stream, uint64_t num) +{ + assert(mp_sizeof_uint(num) <= 9); + char *data = mpstream_reserve(stream, 9); + if (data == NULL) + return; + char *pos = mp_encode_uint(data, num); + mpstream_advance(stream, pos - data); +} + +void +mpstream_encode_int(struct mpstream *stream, int64_t num) +{ + assert(mp_sizeof_int(num) <= 9); + char *data = mpstream_reserve(stream, 9); + if (data == NULL) + return; + char *pos = mp_encode_int(data, num); + mpstream_advance(stream, pos - data); +} + +void +mpstream_encode_float(struct mpstream *stream, float num) +{ + assert(mp_sizeof_float(num) <= 5); + char *data = mpstream_reserve(stream, 5); + if (data == NULL) + return; + char *pos = mp_encode_float(data, num); + mpstream_advance(stream, pos - data); +} + +void +mpstream_encode_double(struct mpstream *stream, double num) +{ + assert(mp_sizeof_double(num) <= 9); + char *data = mpstream_reserve(stream, 9); + char *pos = mp_encode_double(data, num); + if (data == NULL) + return; + mpstream_advance(stream, pos - data); +} + +void +mpstream_encode_strn(struct mpstream *stream, const char *str, uint32_t len) +{ + assert(mp_sizeof_str(len) <= 5 + len); + char *data = mpstream_reserve(stream, 5 + len); + if (data == NULL) + return; + char *pos = mp_encode_str(data, str, len); + mpstream_advance(stream, pos - data); +} + +void +mpstream_encode_str(struct mpstream *stream, const char *str) +{ + mpstream_encode_strn(stream, str, strlen(str)); +} + +void +mpstream_encode_nil(struct mpstream *stream) +{ + assert(mp_sizeof_nil() <= 1); + char *data = mpstream_reserve(stream, 1); + if (data == NULL) + return; + char *pos = mp_encode_nil(data); + mpstream_advance(stream, pos - data); +} + +void +mpstream_encode_bool(struct mpstream *stream, bool val) +{ + assert(mp_sizeof_bool(val) <= 1); + char *data = mpstream_reserve(stream, 1); + if (data == NULL) + return; + char *pos = mp_encode_bool(data, val); + mpstream_advance(stream, pos - data); +} diff --git a/src/mpstream.h b/src/mpstream.h new file mode 100644 index 0000000..789bd74 --- /dev/null +++ b/src/mpstream.h @@ -0,0 +1,124 @@ +#ifndef TARANTOOL_LUA_MPSTREAM_H_INCLUDED +#define TARANTOOL_LUA_MPSTREAM_H_INCLUDED +/* + * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "diag.h" + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +/** +* Ask the allocator to reserve at least size bytes. It can reserve +* more, and update *size with the new size. +*/ +typedef void *(*mpstream_reserve_f)(void *ctx, size_t *size); + +/** Actually use the bytes. */ +typedef void *(*mpstream_alloc_f)(void *ctx, size_t size); + +/** Actually use the bytes. */ +typedef void (*mpstream_error_f)(void *error_ctx); + +struct mpstream { + /** + * When pos >= end, or required size doesn't fit in + * pos..end range alloc() is called to advance the stream + * and reserve() to get a new chunk. + */ + char *buf, *pos, *end; + void *ctx; + mpstream_reserve_f reserve; + mpstream_alloc_f alloc; + mpstream_error_f error; + void *error_ctx; +}; + +void +mpstream_reserve_slow(struct mpstream *stream, size_t size); + +void +mpstream_reset(struct mpstream *stream); + +/** + * A streaming API so that it's possible to encode to any output + * stream. + */ +void +mpstream_init(struct mpstream *stream, void *ctx, + mpstream_reserve_f reserve, mpstream_alloc_f alloc, + mpstream_error_f error, void *error_ctx); + +void +mpstream_flush(struct mpstream *stream); + +char * +mpstream_reserve(struct mpstream *stream, size_t size); + +void +mpstream_advance(struct mpstream *stream, size_t size); + +void +mpstream_encode_array(struct mpstream *stream, uint32_t size); + +void +mpstream_encode_map(struct mpstream *stream, uint32_t size); + +void +mpstream_encode_uint(struct mpstream *stream, uint64_t num); + +void +mpstream_encode_int(struct mpstream *stream, int64_t num); + +void +mpstream_encode_float(struct mpstream *stream, float num); + +void +mpstream_encode_double(struct mpstream *stream, double num); + +void +mpstream_encode_strn(struct mpstream *stream, const char *str, uint32_t len); + +void +mpstream_encode_str(struct mpstream *stream, const char *str); + +void +mpstream_encode_nil(struct mpstream *stream); + +void +mpstream_encode_bool(struct mpstream *stream, bool val); + +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* TARANTOOL_LUA_MPSTREAM_H_INCLUDED */ -- 2.7.4