From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp18.mail.ru (smtp18.mail.ru [94.100.176.155]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 3CCE743E888 for ; Tue, 24 Mar 2020 15:46:14 +0300 (MSK) From: Leonid Vasiliev Date: Tue, 24 Mar 2020 15:46:01 +0300 Message-Id: <7982fc7b062b2424689a990de1f76ca2ff0e4f50.1585053743.git.lvasiliev@tarantool.org> In-Reply-To: References: In-Reply-To: References: Subject: [Tarantool-patches] [PATCH 3/6] iproto: Add negotiation phase List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: alexander.turenko@tarantool.org Cc: tarantool-patches@dev.tarantool.org The negotiation phase has been added to IPROTO For possibility to have a custom parameters of session the negotiation phase has been added. This is necessary to enable the transmission of an error in different formats(depending on the choice of the client). @TarantoolBot document Title: IPROTO: The negatiation phase For backward compatibility of the data transmission format, the negotiation phase has been added to IPROTO. A new key (IPROTO_NEGOTIATION) has been added to IPROTO command codes. NEGOTIATION BODY: CODE = 0x0E +==========================+ | | | NEGOTIATION PARAMETERS | | | +==========================+ MP_MAP Session negotiation parameters are a map with keys like ERROR_FORMAT_VERSION ... The response is a map with all stated negotiation parameters. So, for work with the new format of errors, it is necessary to perform the negotiation phase, otherwise errors will be transmitted in the old format (by default). Needed for #4398 --- src/box/iproto.cc | 20 +++++++++++ src/box/iproto_constants.h | 11 ++++++ src/box/lua/net_box.c | 34 ++++++++++++++++++ src/box/lua/net_box.lua | 57 ++++++++++++++++++++++++++---- src/box/session.cc | 12 +++++++ src/box/session.h | 27 ++++++++++++++ src/box/xrow.c | 87 ++++++++++++++++++++++++++++++++++++++++++++++ src/box/xrow.h | 38 ++++++++++++++++++++ 8 files changed, 279 insertions(+), 7 deletions(-) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 9dad43b..2df41ba 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -197,6 +197,8 @@ struct iproto_msg struct auth_request auth; /* SQL request, if this is the EXECUTE/PREPARE request. */ struct sql_request sql; + /** Negotiation request */ + struct negotiation_params neg_req; /** In case of iproto parse error, saved diagnostics. */ struct diag diag; }; @@ -1309,6 +1311,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, goto error; cmsg_init(&msg->base, misc_route); break; + case IPROTO_NEGOTIATION: { + struct session *ses = msg->connection->session; + /* Copy current parameters to modify */ + memcpy(&msg->neg_req, &ses->neg_param, + sizeof(struct negotiation_params)); + if (xrow_decode_negotiation(&msg->header, &msg->neg_req)) + goto error; + cmsg_init(&msg->base, misc_route); + break; + } default: diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) type); @@ -1714,6 +1726,14 @@ tx_process_misc(struct cmsg *m) iproto_reply_vote_xc(out, &ballot, msg->header.sync, ::schema_version); break; + case IPROTO_NEGOTIATION: + session_update_neg_parameters(con->session, + &msg->neg_req); + iproto_reply_negotiation_xc(out, + &con->session->neg_param, + msg->header.sync, + ::schema_version); + break; default: unreachable(); } diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index f9d413a..1851712 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -112,6 +112,7 @@ enum iproto_key { IPROTO_METADATA = 0x32, IPROTO_BIND_METADATA = 0x33, IPROTO_BIND_COUNT = 0x34, + IPROTO_NEG_PARAM = 0x35, /* Leave a gap between response keys and SQL keys. */ IPROTO_SQL_TEXT = 0x40, @@ -215,6 +216,8 @@ enum iproto_type { IPROTO_NOP = 12, /** Prepare SQL statement. */ IPROTO_PREPARE = 13, + /** Negotiation of session parameters */ + IPROTO_NEGOTIATION = 14, /** The maximum typecode used for box.stat() */ IPROTO_TYPE_STAT_MAX, @@ -467,6 +470,14 @@ vy_row_index_key_name(enum vy_row_index_key key) return vy_row_index_key_strs[key]; } +/** + * Negotiation protocol's keys + */ +enum neg_key { + /** Version of an error format */ + ERROR_FORMAT_VERSION +}; + #if defined(__cplusplus) } /* extern "C" */ #endif diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index c7bd016..5321e87 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -417,6 +417,39 @@ netbox_encode_upsert(lua_State *L) } static int +netbox_encode_negotiation(lua_State *L) +{ + if (lua_gettop(L) < 3) + return luaL_error(L, "Usage: netbox.encode_negotiation(ibuf, sync, " + "opts)"); + + + /* Check opts is table and parse it */ + if (lua_istable(L, 3) == 0) { + return luaL_error(L, "Expected opts is table"); + } + + lua_getfield(L, 3, "error_format_ver"); + + int err_format_ver = ERR_FORMAT_DEF; + if (lua_isnumber(L, -1)) { + err_format_ver = lua_tonumber(L, -1); + } + + struct mpstream stream; + size_t svp = netbox_prepare_request(L, &stream, IPROTO_NEGOTIATION); + netbox_encode_request(&stream, svp); + + mpstream_encode_map(&stream, 1); + mpstream_encode_uint(&stream, ERROR_FORMAT_VERSION); + mpstream_encode_uint(&stream, err_format_ver); + + netbox_encode_request(&stream, svp); + + return 0; +} + +static int netbox_decode_greeting(lua_State *L) { struct greeting greeting; @@ -901,6 +934,7 @@ luaopen_net_box(struct lua_State *L) { "encode_execute", netbox_encode_execute}, { "encode_prepare", netbox_encode_prepare}, { "encode_auth", netbox_encode_auth }, + { "encode_negotiation", netbox_encode_negotiation }, { "decode_greeting",netbox_decode_greeting }, { "communicate", netbox_communicate }, { "decode_select", netbox_decode_select }, diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 3f611c0..7fdad64 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -26,6 +26,7 @@ local check_primary_index = box.internal.check_primary_index local communicate = internal.communicate local encode_auth = internal.encode_auth local encode_select = internal.encode_select +local encode_negotiation = internal.encode_negotiation local decode_greeting = internal.decode_greeting local TIMEOUT_INFINITY = 500 * 365 * 86400 @@ -38,12 +39,13 @@ local IPROTO_STATUS_KEY = 0x00 local IPROTO_ERRNO_MASK = 0x7FFF local IPROTO_SYNC_KEY = 0x01 local IPROTO_SCHEMA_VERSION_KEY = 0x05 -local IPROTO_METADATA_KEY = 0x32 -local IPROTO_SQL_INFO_KEY = 0x42 +local IPROTO_METADATA_KEY = 0x32 +local IPROTO_SQL_INFO_KEY = 0x42 local SQL_INFO_ROW_COUNT_KEY = 0 local IPROTO_FIELD_NAME_KEY = 0 local IPROTO_DATA_KEY = 0x30 local IPROTO_ERROR_KEY = 0x31 +local IPROTO_NEG_PARAM = 0x35 local IPROTO_GREETING_SIZE = 128 local IPROTO_CHUNK_KEY = 128 local IPROTO_OK_KEY = 0 @@ -57,6 +59,11 @@ local E_PROC_LUA = box.error.PROC_LUA -- utility tables local is_final_state = {closed = 1, error = 1} +-- negotiations keys +local neg_keys = { + ERROR_FORMAT_VERSION = 0 +} + local function decode_nil(raw_data, raw_data_end) return nil, raw_data_end end @@ -83,6 +90,10 @@ local function decode_push(raw_data) local response, raw_end = decode(raw_data) return response[IPROTO_DATA_KEY][1], raw_end end +local function decode_negotiation(raw_data) + local response, raw_end = decode(raw_data) + return response[IPROTO_NEG_PARAM], raw_end +end local function version_id(major, minor, patch) return bit.bor(bit.lshift(major, 16), bit.lshift(minor, 8), patch) @@ -110,6 +121,7 @@ local method_encoder = { min = internal.encode_select, max = internal.encode_select, count = internal.encode_call, + negotiation = internal.encode_negotiation, -- inject raw data into connection, used by console and tests inject = function(buf, id, bytes) local ptr = buf:reserve(#bytes) @@ -138,9 +150,12 @@ local method_decoder = { count = decode_count, inject = decode_data, push = decode_push, + negotiation = decode_negotiation } -local function next_id(id) return band(id + 1, 0x7FFFFFFF) end +local function next_id(id) + return band(id + 1, 0x7FFFFFFF) +end -- -- Connect to a remote server, do handshake. @@ -435,7 +450,9 @@ local function create_transport(host, port, user, password, callback, local protocol_sm local function start() - if state ~= 'initial' then return not is_final_state[state] end + if state ~= 'initial' then + return not is_final_state[state] + end fiber.create(function() local ok, err, timeout worker_fiber = fiber_self() @@ -804,7 +821,9 @@ local function create_transport(host, port, user, password, callback, iproto_sm = function(schema_version) local err, hdr, body_rpos, body_end = send_and_recv_iproto() - if err then return error_sm(err, hdr) end + if err then + return error_sm(err, hdr) + end dispatch_response_iproto(hdr, body_rpos, body_end) local status = hdr[IPROTO_STATUS_KEY] local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY] @@ -821,7 +840,10 @@ local function create_transport(host, port, user, password, callback, end error_sm = function(err, msg) - if connection then connection:close(); connection = nil end + if connection then + connection:close() + connection = nil + end send_buf:recycle() recv_buf:recycle() if state ~= 'closed' then @@ -924,8 +946,11 @@ local console_mt = { local space_metatable, index_metatable + + local function new_sm(host, port, opts, connection, greeting) - local user, password = opts.user, opts.password; opts.password = nil + local user, password = opts.user, opts.password + opts.password = nil local last_reconnect_error local remote = {host = host, port = port, opts = opts, state = 'initial'} local function callback(what, ...) @@ -1004,6 +1029,24 @@ local function new_sm(host, port, opts, connection, greeting) if opts.wait_connected ~= false then remote._transport.wait_state('active', tonumber(opts.wait_connected)) end + + -- Negotiation if needed + if opts.error_extended then + local peer_has_negotiation = version_at_least(remote.peer_version_id, 2, 4, 1) + if not peer_has_negotiation then + box.error(box.error.PROC_LUA, "Negotiations failed") + end + local neg_req = {error_format_ver = 1} + local neg_resp = remote:_request("negotiation", nil, nil, neg_req) + if neg_resp[neg_keys.ERROR_FORMAT_VERSION] ~= 1 then + remote._transport.stop() + box.error(box.error.PROC_LUA, "Negotiations failed") + end + + remote._error_extended = true + end + -- Negotiation end + return remote end diff --git a/src/box/session.cc b/src/box/session.cc index 8813182..c0224ca 100644 --- a/src/box/session.cc +++ b/src/box/session.cc @@ -37,6 +37,7 @@ #include "error.h" #include "tt_static.h" #include "sql_stmt_cache.h" +#include "iproto_constants.h" const char *session_type_strs[] = { "background", @@ -144,6 +145,9 @@ session_create(enum session_type type) session->sql_default_engine = SQL_STORAGE_ENGINE_MEMTX; session->sql_stmts = NULL; + /* Set default negotiation parameters */ + session->neg_param.err_format_ver = ERR_FORMAT_DEF; + /* For on_connect triggers. */ credentials_create(&session->credentials, guest_user); struct mh_i64ptr_node_t node; @@ -373,3 +377,11 @@ generic_session_sync(struct session *session) (void) session; return 0; } + +int +session_update_neg_parameters(struct session *session, + const struct negotiation_params *params) +{ + session->neg_param.err_format_ver = params->err_format_ver; + return 0; +} diff --git a/src/box/session.h b/src/box/session.h index 6dfc7cb..3b4dfb0 100644 --- a/src/box/session.h +++ b/src/box/session.h @@ -81,6 +81,24 @@ union session_meta { }; /** + * An error transmission formats + */ +enum error_formats { + /** Default(old) format */ + ERR_FORMAT_DEF, + /** Extended format */ + ERR_FORMAT_EX +}; + +/** + * Parameters which may be changed at negotiation phase of session +*/ +struct negotiation_params { + /** Version of a format for an error transmission */ + uint8_t err_format_ver; +}; + +/** * Abstraction of a single user session: * for now, only provides accounting of established * sessions and on-connect/on-disconnect event @@ -110,6 +128,8 @@ struct session { struct credentials credentials; /** Trigger for fiber on_stop to cleanup created on-demand session */ struct trigger fiber_on_stop; + /** Negotiation parameters */ + struct negotiation_params neg_param; }; struct session_vtab { @@ -364,6 +384,13 @@ generic_session_fd(struct session *session); int64_t generic_session_sync(struct session *session); +/** + * Update negatiation parameters of the session +*/ +int +session_update_neg_parameters(struct session *session, + const struct negotiation_params *params); + #if defined(__cplusplus) } /* extern "C" */ diff --git a/src/box/xrow.c b/src/box/xrow.c index 5e3cb07..90dffea 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -479,6 +479,42 @@ iproto_reply_vote(struct obuf *out, const struct ballot *ballot, } int +iproto_reply_negotiation(struct obuf *out, + const struct negotiation_params *neg_param, + uint64_t sync, + uint32_t schema_version) +{ + char *buf = NULL; + uint32_t buf_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) + + mp_sizeof_uint(IPROTO_NEG_PARAM) + mp_sizeof_map(1) + + mp_sizeof_uint(ERROR_FORMAT_VERSION) + + mp_sizeof_uint(neg_param->err_format_ver); + + buf = obuf_reserve(out, buf_size); + if (buf == NULL) { + diag_set(OutOfMemory, buf_size, + "obuf_alloc", "buf"); + return -1; + }; + + char *data = buf + IPROTO_HEADER_LEN; + data = mp_encode_map(data, 1); + data = mp_encode_uint(data, IPROTO_NEG_PARAM); + data = mp_encode_map(data, 1); + data = mp_encode_uint(data, ERROR_FORMAT_VERSION); + data = mp_encode_uint(data, neg_param->err_format_ver); + assert(data == buf + buf_size); + + iproto_header_encode(buf, IPROTO_OK, sync, schema_version, + buf_size - IPROTO_HEADER_LEN); + + char *ptr = obuf_alloc(out, buf_size); + (void) ptr; + assert(ptr == buf); + return 0; +} + +int iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync, uint32_t schema_version) { @@ -1518,3 +1554,54 @@ greeting_decode(const char *greetingbuf, struct greeting *greeting) return 0; } + +int +xrow_decode_negotiation(const struct xrow_header *row, + struct negotiation_params *request) +{ + if (row->bodycnt == 0) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "missing request body"); + return -1; + } + + assert(row->bodycnt == 1); + const char *data = (const char *) row->body[0].iov_base; + + const char *end = data + row->body[0].iov_len; + assert((end - data) > 0); + + if (mp_typeof(*data) != MP_MAP || mp_check_map(data, end) > 0) { +error: + xrow_on_decode_err(row->body[0].iov_base, + end, ER_INVALID_MSGPACK, + "packet body"); + return -1; + } + + uint32_t map_size = mp_decode_map(&data); + for (uint32_t i = 0; i < map_size; ++i) { + if ((end - data) < 1 || mp_typeof(*data) != MP_UINT) + goto error; + + uint64_t key = mp_decode_uint(&data); + + switch (key) { + case ERROR_FORMAT_VERSION: + if (mp_typeof(*data) != MP_UINT) + goto error; + request->err_format_ver = mp_decode_uint(&data); + break; + default: + /* unknown key */ + continue; + } + } + if (data != end) { + xrow_on_decode_err(row->body[0].iov_base, end, + ER_INVALID_MSGPACK, "packet end"); + return -1; + } + + return 0; +} diff --git a/src/box/xrow.h b/src/box/xrow.h index 2a0a9c8..bbd0496 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -37,6 +37,7 @@ #include "uuid/tt_uuid.h" #include "diag.h" +#include "session.h" #include "vclock.h" #if defined(__cplusplus) @@ -177,6 +178,17 @@ struct request { }; /** + * Decode negotiation request + * @param row + * @param request + * @retval -1 on error, see diag + * @retval 0 success + */ +int +xrow_decode_negotiation(const struct xrow_header *row, + struct negotiation_params *request); + +/** * Create a JSON-like string representation of a request. */ const char * request_str(const struct request *request); @@ -566,6 +578,22 @@ int iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync, uint32_t schema_version); +/** + * Write a negotiation reply packet to output buffer. + * @param out Buffer to write to. + * @param neg_param Current negotiation parameters. + * @param sync Request sync. + * @param schema_version Actual schema version. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +iproto_reply_negotiation(struct obuf *out, + const struct negotiation_params *neg_param, + uint64_t sync, + uint32_t schema_version); + /** EXECUTE/PREPARE request. */ struct sql_request { /** SQL statement text. */ @@ -936,6 +964,16 @@ iproto_reply_vote_xc(struct obuf *out, const struct ballot *ballot, diag_raise(); } +/** @copydoc iproto_reply_negotiation. */ +static inline void +iproto_reply_negotiation_xc(struct obuf *out, + const struct negotiation_params *neg_param, + uint64_t sync, uint32_t schema_version) +{ + if (iproto_reply_negotiation(out, neg_param, sync, schema_version) != 0) + diag_raise(); +} + #endif #endif /* TARANTOOL_XROW_H_INCLUDED */ -- 2.7.4