[Tarantool-patches] [PATCH 3/6] iproto: Add negotiation phase
Leonid Vasiliev
lvasiliev at tarantool.org
Tue Mar 24 15:46:01 MSK 2020
The negotiation phase has been added to IPROTO
For possibility to have a custom parameters of session the negotiation
phase has been added. This is necessary to enable the transmission of
an error in different formats(depending on the choice of the client).
@TarantoolBot document
Title: IPROTO: The negatiation phase
For backward compatibility of the data transmission format,
the negotiation phase has been added to IPROTO.
A new key (IPROTO_NEGOTIATION) has been added to IPROTO command codes.
NEGOTIATION BODY: CODE = 0x0E
+==========================+
| |
| NEGOTIATION PARAMETERS |
| |
+==========================+
MP_MAP
Session negotiation parameters are a map with keys like ERROR_FORMAT_VERSION ...
The response is a map with all stated negotiation parameters.
So, for work with the new format of errors, it is necessary to perform the negotiation phase,
otherwise errors will be transmitted in the old format (by default).
Needed for #4398
---
src/box/iproto.cc | 20 +++++++++++
src/box/iproto_constants.h | 11 ++++++
src/box/lua/net_box.c | 34 ++++++++++++++++++
src/box/lua/net_box.lua | 57 ++++++++++++++++++++++++++----
src/box/session.cc | 12 +++++++
src/box/session.h | 27 ++++++++++++++
src/box/xrow.c | 87 ++++++++++++++++++++++++++++++++++++++++++++++
src/box/xrow.h | 38 ++++++++++++++++++++
8 files changed, 279 insertions(+), 7 deletions(-)
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9dad43b..2df41ba 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -197,6 +197,8 @@ struct iproto_msg
struct auth_request auth;
/* SQL request, if this is the EXECUTE/PREPARE request. */
struct sql_request sql;
+ /** Negotiation request */
+ struct negotiation_params neg_req;
/** In case of iproto parse error, saved diagnostics. */
struct diag diag;
};
@@ -1309,6 +1311,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
goto error;
cmsg_init(&msg->base, misc_route);
break;
+ case IPROTO_NEGOTIATION: {
+ struct session *ses = msg->connection->session;
+ /* Copy current parameters to modify */
+ memcpy(&msg->neg_req, &ses->neg_param,
+ sizeof(struct negotiation_params));
+ if (xrow_decode_negotiation(&msg->header, &msg->neg_req))
+ goto error;
+ cmsg_init(&msg->base, misc_route);
+ break;
+ }
default:
diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
(uint32_t) type);
@@ -1714,6 +1726,14 @@ tx_process_misc(struct cmsg *m)
iproto_reply_vote_xc(out, &ballot, msg->header.sync,
::schema_version);
break;
+ case IPROTO_NEGOTIATION:
+ session_update_neg_parameters(con->session,
+ &msg->neg_req);
+ iproto_reply_negotiation_xc(out,
+ &con->session->neg_param,
+ msg->header.sync,
+ ::schema_version);
+ break;
default:
unreachable();
}
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index f9d413a..1851712 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -112,6 +112,7 @@ enum iproto_key {
IPROTO_METADATA = 0x32,
IPROTO_BIND_METADATA = 0x33,
IPROTO_BIND_COUNT = 0x34,
+ IPROTO_NEG_PARAM = 0x35,
/* Leave a gap between response keys and SQL keys. */
IPROTO_SQL_TEXT = 0x40,
@@ -215,6 +216,8 @@ enum iproto_type {
IPROTO_NOP = 12,
/** Prepare SQL statement. */
IPROTO_PREPARE = 13,
+ /** Negotiation of session parameters */
+ IPROTO_NEGOTIATION = 14,
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,
@@ -467,6 +470,14 @@ vy_row_index_key_name(enum vy_row_index_key key)
return vy_row_index_key_strs[key];
}
+/**
+ * Negotiation protocol's keys
+ */
+enum neg_key {
+ /** Version of an error format */
+ ERROR_FORMAT_VERSION
+};
+
#if defined(__cplusplus)
} /* extern "C" */
#endif
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index c7bd016..5321e87 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -417,6 +417,39 @@ netbox_encode_upsert(lua_State *L)
}
static int
+netbox_encode_negotiation(lua_State *L)
+{
+ if (lua_gettop(L) < 3)
+ return luaL_error(L, "Usage: netbox.encode_negotiation(ibuf, sync, "
+ "opts)");
+
+
+ /* Check opts is table and parse it */
+ if (lua_istable(L, 3) == 0) {
+ return luaL_error(L, "Expected opts is table");
+ }
+
+ lua_getfield(L, 3, "error_format_ver");
+
+ int err_format_ver = ERR_FORMAT_DEF;
+ if (lua_isnumber(L, -1)) {
+ err_format_ver = lua_tonumber(L, -1);
+ }
+
+ struct mpstream stream;
+ size_t svp = netbox_prepare_request(L, &stream, IPROTO_NEGOTIATION);
+ netbox_encode_request(&stream, svp);
+
+ mpstream_encode_map(&stream, 1);
+ mpstream_encode_uint(&stream, ERROR_FORMAT_VERSION);
+ mpstream_encode_uint(&stream, err_format_ver);
+
+ netbox_encode_request(&stream, svp);
+
+ return 0;
+}
+
+static int
netbox_decode_greeting(lua_State *L)
{
struct greeting greeting;
@@ -901,6 +934,7 @@ luaopen_net_box(struct lua_State *L)
{ "encode_execute", netbox_encode_execute},
{ "encode_prepare", netbox_encode_prepare},
{ "encode_auth", netbox_encode_auth },
+ { "encode_negotiation", netbox_encode_negotiation },
{ "decode_greeting",netbox_decode_greeting },
{ "communicate", netbox_communicate },
{ "decode_select", netbox_decode_select },
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3f611c0..7fdad64 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -26,6 +26,7 @@ local check_primary_index = box.internal.check_primary_index
local communicate = internal.communicate
local encode_auth = internal.encode_auth
local encode_select = internal.encode_select
+local encode_negotiation = internal.encode_negotiation
local decode_greeting = internal.decode_greeting
local TIMEOUT_INFINITY = 500 * 365 * 86400
@@ -38,12 +39,13 @@ local IPROTO_STATUS_KEY = 0x00
local IPROTO_ERRNO_MASK = 0x7FFF
local IPROTO_SYNC_KEY = 0x01
local IPROTO_SCHEMA_VERSION_KEY = 0x05
-local IPROTO_METADATA_KEY = 0x32
-local IPROTO_SQL_INFO_KEY = 0x42
+local IPROTO_METADATA_KEY = 0x32
+local IPROTO_SQL_INFO_KEY = 0x42
local SQL_INFO_ROW_COUNT_KEY = 0
local IPROTO_FIELD_NAME_KEY = 0
local IPROTO_DATA_KEY = 0x30
local IPROTO_ERROR_KEY = 0x31
+local IPROTO_NEG_PARAM = 0x35
local IPROTO_GREETING_SIZE = 128
local IPROTO_CHUNK_KEY = 128
local IPROTO_OK_KEY = 0
@@ -57,6 +59,11 @@ local E_PROC_LUA = box.error.PROC_LUA
-- utility tables
local is_final_state = {closed = 1, error = 1}
+-- negotiations keys
+local neg_keys = {
+ ERROR_FORMAT_VERSION = 0
+}
+
local function decode_nil(raw_data, raw_data_end)
return nil, raw_data_end
end
@@ -83,6 +90,10 @@ local function decode_push(raw_data)
local response, raw_end = decode(raw_data)
return response[IPROTO_DATA_KEY][1], raw_end
end
+local function decode_negotiation(raw_data)
+ local response, raw_end = decode(raw_data)
+ return response[IPROTO_NEG_PARAM], raw_end
+end
local function version_id(major, minor, patch)
return bit.bor(bit.lshift(major, 16), bit.lshift(minor, 8), patch)
@@ -110,6 +121,7 @@ local method_encoder = {
min = internal.encode_select,
max = internal.encode_select,
count = internal.encode_call,
+ negotiation = internal.encode_negotiation,
-- inject raw data into connection, used by console and tests
inject = function(buf, id, bytes)
local ptr = buf:reserve(#bytes)
@@ -138,9 +150,12 @@ local method_decoder = {
count = decode_count,
inject = decode_data,
push = decode_push,
+ negotiation = decode_negotiation
}
-local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
+local function next_id(id)
+ return band(id + 1, 0x7FFFFFFF)
+end
--
-- Connect to a remote server, do handshake.
@@ -435,7 +450,9 @@ local function create_transport(host, port, user, password, callback,
local protocol_sm
local function start()
- if state ~= 'initial' then return not is_final_state[state] end
+ if state ~= 'initial' then
+ return not is_final_state[state]
+ end
fiber.create(function()
local ok, err, timeout
worker_fiber = fiber_self()
@@ -804,7 +821,9 @@ local function create_transport(host, port, user, password, callback,
iproto_sm = function(schema_version)
local err, hdr, body_rpos, body_end = send_and_recv_iproto()
- if err then return error_sm(err, hdr) end
+ if err then
+ return error_sm(err, hdr)
+ end
dispatch_response_iproto(hdr, body_rpos, body_end)
local status = hdr[IPROTO_STATUS_KEY]
local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY]
@@ -821,7 +840,10 @@ local function create_transport(host, port, user, password, callback,
end
error_sm = function(err, msg)
- if connection then connection:close(); connection = nil end
+ if connection then
+ connection:close()
+ connection = nil
+ end
send_buf:recycle()
recv_buf:recycle()
if state ~= 'closed' then
@@ -924,8 +946,11 @@ local console_mt = {
local space_metatable, index_metatable
+
+
local function new_sm(host, port, opts, connection, greeting)
- local user, password = opts.user, opts.password; opts.password = nil
+ local user, password = opts.user, opts.password
+ opts.password = nil
local last_reconnect_error
local remote = {host = host, port = port, opts = opts, state = 'initial'}
local function callback(what, ...)
@@ -1004,6 +1029,24 @@ local function new_sm(host, port, opts, connection, greeting)
if opts.wait_connected ~= false then
remote._transport.wait_state('active', tonumber(opts.wait_connected))
end
+
+ -- Negotiation if needed
+ if opts.error_extended then
+ local peer_has_negotiation = version_at_least(remote.peer_version_id, 2, 4, 1)
+ if not peer_has_negotiation then
+ box.error(box.error.PROC_LUA, "Negotiations failed")
+ end
+ local neg_req = {error_format_ver = 1}
+ local neg_resp = remote:_request("negotiation", nil, nil, neg_req)
+ if neg_resp[neg_keys.ERROR_FORMAT_VERSION] ~= 1 then
+ remote._transport.stop()
+ box.error(box.error.PROC_LUA, "Negotiations failed")
+ end
+
+ remote._error_extended = true
+ end
+ -- Negotiation end
+
return remote
end
diff --git a/src/box/session.cc b/src/box/session.cc
index 8813182..c0224ca 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -37,6 +37,7 @@
#include "error.h"
#include "tt_static.h"
#include "sql_stmt_cache.h"
+#include "iproto_constants.h"
const char *session_type_strs[] = {
"background",
@@ -144,6 +145,9 @@ session_create(enum session_type type)
session->sql_default_engine = SQL_STORAGE_ENGINE_MEMTX;
session->sql_stmts = NULL;
+ /* Set default negotiation parameters */
+ session->neg_param.err_format_ver = ERR_FORMAT_DEF;
+
/* For on_connect triggers. */
credentials_create(&session->credentials, guest_user);
struct mh_i64ptr_node_t node;
@@ -373,3 +377,11 @@ generic_session_sync(struct session *session)
(void) session;
return 0;
}
+
+int
+session_update_neg_parameters(struct session *session,
+ const struct negotiation_params *params)
+{
+ session->neg_param.err_format_ver = params->err_format_ver;
+ return 0;
+}
diff --git a/src/box/session.h b/src/box/session.h
index 6dfc7cb..3b4dfb0 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -81,6 +81,24 @@ union session_meta {
};
/**
+ * An error transmission formats
+ */
+enum error_formats {
+ /** Default(old) format */
+ ERR_FORMAT_DEF,
+ /** Extended format */
+ ERR_FORMAT_EX
+};
+
+/**
+ * Parameters which may be changed at negotiation phase of session
+*/
+struct negotiation_params {
+ /** Version of a format for an error transmission */
+ uint8_t err_format_ver;
+};
+
+/**
* Abstraction of a single user session:
* for now, only provides accounting of established
* sessions and on-connect/on-disconnect event
@@ -110,6 +128,8 @@ struct session {
struct credentials credentials;
/** Trigger for fiber on_stop to cleanup created on-demand session */
struct trigger fiber_on_stop;
+ /** Negotiation parameters */
+ struct negotiation_params neg_param;
};
struct session_vtab {
@@ -364,6 +384,13 @@ generic_session_fd(struct session *session);
int64_t
generic_session_sync(struct session *session);
+/**
+ * Update negatiation parameters of the session
+*/
+int
+session_update_neg_parameters(struct session *session,
+ const struct negotiation_params *params);
+
#if defined(__cplusplus)
} /* extern "C" */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 5e3cb07..90dffea 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -479,6 +479,42 @@ iproto_reply_vote(struct obuf *out, const struct ballot *ballot,
}
int
+iproto_reply_negotiation(struct obuf *out,
+ const struct negotiation_params *neg_param,
+ uint64_t sync,
+ uint32_t schema_version)
+{
+ char *buf = NULL;
+ uint32_t buf_size = IPROTO_HEADER_LEN + mp_sizeof_map(1)
+ + mp_sizeof_uint(IPROTO_NEG_PARAM) + mp_sizeof_map(1)
+ + mp_sizeof_uint(ERROR_FORMAT_VERSION)
+ + mp_sizeof_uint(neg_param->err_format_ver);
+
+ buf = obuf_reserve(out, buf_size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, buf_size,
+ "obuf_alloc", "buf");
+ return -1;
+ };
+
+ char *data = buf + IPROTO_HEADER_LEN;
+ data = mp_encode_map(data, 1);
+ data = mp_encode_uint(data, IPROTO_NEG_PARAM);
+ data = mp_encode_map(data, 1);
+ data = mp_encode_uint(data, ERROR_FORMAT_VERSION);
+ data = mp_encode_uint(data, neg_param->err_format_ver);
+ assert(data == buf + buf_size);
+
+ iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
+ buf_size - IPROTO_HEADER_LEN);
+
+ char *ptr = obuf_alloc(out, buf_size);
+ (void) ptr;
+ assert(ptr == buf);
+ return 0;
+}
+
+int
iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
uint32_t schema_version)
{
@@ -1518,3 +1554,54 @@ greeting_decode(const char *greetingbuf, struct greeting *greeting)
return 0;
}
+
+int
+xrow_decode_negotiation(const struct xrow_header *row,
+ struct negotiation_params *request)
+{
+ if (row->bodycnt == 0) {
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "missing request body");
+ return -1;
+ }
+
+ assert(row->bodycnt == 1);
+ const char *data = (const char *) row->body[0].iov_base;
+
+ const char *end = data + row->body[0].iov_len;
+ assert((end - data) > 0);
+
+ if (mp_typeof(*data) != MP_MAP || mp_check_map(data, end) > 0) {
+error:
+ xrow_on_decode_err(row->body[0].iov_base,
+ end, ER_INVALID_MSGPACK,
+ "packet body");
+ return -1;
+ }
+
+ uint32_t map_size = mp_decode_map(&data);
+ for (uint32_t i = 0; i < map_size; ++i) {
+ if ((end - data) < 1 || mp_typeof(*data) != MP_UINT)
+ goto error;
+
+ uint64_t key = mp_decode_uint(&data);
+
+ switch (key) {
+ case ERROR_FORMAT_VERSION:
+ if (mp_typeof(*data) != MP_UINT)
+ goto error;
+ request->err_format_ver = mp_decode_uint(&data);
+ break;
+ default:
+ /* unknown key */
+ continue;
+ }
+ }
+ if (data != end) {
+ xrow_on_decode_err(row->body[0].iov_base, end,
+ ER_INVALID_MSGPACK, "packet end");
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 2a0a9c8..bbd0496 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -37,6 +37,7 @@
#include "uuid/tt_uuid.h"
#include "diag.h"
+#include "session.h"
#include "vclock.h"
#if defined(__cplusplus)
@@ -177,6 +178,17 @@ struct request {
};
/**
+ * Decode negotiation request
+ * @param row
+ * @param request
+ * @retval -1 on error, see diag
+ * @retval 0 success
+ */
+int
+xrow_decode_negotiation(const struct xrow_header *row,
+ struct negotiation_params *request);
+
+/**
* Create a JSON-like string representation of a request. */
const char *
request_str(const struct request *request);
@@ -566,6 +578,22 @@ int
iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
uint32_t schema_version);
+/**
+ * Write a negotiation reply packet to output buffer.
+ * @param out Buffer to write to.
+ * @param neg_param Current negotiation parameters.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ *
+ * @retval 0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_reply_negotiation(struct obuf *out,
+ const struct negotiation_params *neg_param,
+ uint64_t sync,
+ uint32_t schema_version);
+
/** EXECUTE/PREPARE request. */
struct sql_request {
/** SQL statement text. */
@@ -936,6 +964,16 @@ iproto_reply_vote_xc(struct obuf *out, const struct ballot *ballot,
diag_raise();
}
+/** @copydoc iproto_reply_negotiation. */
+static inline void
+iproto_reply_negotiation_xc(struct obuf *out,
+ const struct negotiation_params *neg_param,
+ uint64_t sync, uint32_t schema_version)
+{
+ if (iproto_reply_negotiation(out, neg_param, sync, schema_version) != 0)
+ diag_raise();
+}
+
#endif
#endif /* TARANTOOL_XROW_H_INCLUDED */
--
2.7.4
More information about the Tarantool-patches
mailing list