Tarantool development patches archive
 help / color / mirror / Atom feed
From: Leonid Vasiliev <lvasiliev@tarantool.org>
To: alexander.turenko@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH 3/6] iproto: Add negotiation phase
Date: Tue, 24 Mar 2020 15:46:01 +0300	[thread overview]
Message-ID: <7982fc7b062b2424689a990de1f76ca2ff0e4f50.1585053743.git.lvasiliev@tarantool.org> (raw)
In-Reply-To: <cover.1585053742.git.lvasiliev@tarantool.org>
In-Reply-To: <cover.1585053742.git.lvasiliev@tarantool.org>

The negotiation phase has been added to IPROTO

For possibility to have a custom parameters of session the negotiation
phase has been added. This is necessary to enable the transmission of
an error in different formats(depending on the choice of the client).

@TarantoolBot document
Title: IPROTO: The negatiation phase
For backward compatibility of the data transmission format,
the negotiation phase has been added to IPROTO.
A new key (IPROTO_NEGOTIATION) has been added to IPROTO command codes.
NEGOTIATION BODY: CODE = 0x0E
+==========================+
|                          |
|  NEGOTIATION PARAMETERS  |
|                          |
+==========================+
           MP_MAP
Session negotiation parameters are a map with keys like ERROR_FORMAT_VERSION ...
The response is a map with all stated negotiation parameters.
So, for work with the new format of errors, it is necessary to perform the negotiation phase,
otherwise errors will be transmitted in the old format (by default).

Needed for #4398
---
 src/box/iproto.cc          | 20 +++++++++++
 src/box/iproto_constants.h | 11 ++++++
 src/box/lua/net_box.c      | 34 ++++++++++++++++++
 src/box/lua/net_box.lua    | 57 ++++++++++++++++++++++++++----
 src/box/session.cc         | 12 +++++++
 src/box/session.h          | 27 ++++++++++++++
 src/box/xrow.c             | 87 ++++++++++++++++++++++++++++++++++++++++++++++
 src/box/xrow.h             | 38 ++++++++++++++++++++
 8 files changed, 279 insertions(+), 7 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9dad43b..2df41ba 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -197,6 +197,8 @@ struct iproto_msg
 		struct auth_request auth;
 		/* SQL request, if this is the EXECUTE/PREPARE request. */
 		struct sql_request sql;
+		/** Negotiation request */
+		struct negotiation_params neg_req;
 		/** In case of iproto parse error, saved diagnostics. */
 		struct diag diag;
 	};
@@ -1309,6 +1311,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 			goto error;
 		cmsg_init(&msg->base, misc_route);
 		break;
+	case IPROTO_NEGOTIATION: {
+		struct session *ses = msg->connection->session;
+		/* Copy current parameters to modify */
+		memcpy(&msg->neg_req, &ses->neg_param,
+		       sizeof(struct negotiation_params));
+		if (xrow_decode_negotiation(&msg->header, &msg->neg_req))
+			goto error;
+		cmsg_init(&msg->base, misc_route);
+		break;
+	}
 	default:
 		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
 			 (uint32_t) type);
@@ -1714,6 +1726,14 @@ tx_process_misc(struct cmsg *m)
 			iproto_reply_vote_xc(out, &ballot, msg->header.sync,
 					     ::schema_version);
 			break;
+		case IPROTO_NEGOTIATION:
+			session_update_neg_parameters(con->session,
+						      &msg->neg_req);
+			iproto_reply_negotiation_xc(out,
+						    &con->session->neg_param,
+						    msg->header.sync,
+						    ::schema_version);
+			break;
 		default:
 			unreachable();
 		}
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index f9d413a..1851712 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -112,6 +112,7 @@ enum iproto_key {
 	IPROTO_METADATA = 0x32,
 	IPROTO_BIND_METADATA = 0x33,
 	IPROTO_BIND_COUNT = 0x34,
+	IPROTO_NEG_PARAM = 0x35,
 
 	/* Leave a gap between response keys and SQL keys. */
 	IPROTO_SQL_TEXT = 0x40,
@@ -215,6 +216,8 @@ enum iproto_type {
 	IPROTO_NOP = 12,
 	/** Prepare SQL statement. */
 	IPROTO_PREPARE = 13,
+	/** Negotiation of session parameters */
+	IPROTO_NEGOTIATION = 14,
 	/** The maximum typecode used for box.stat() */
 	IPROTO_TYPE_STAT_MAX,
 
@@ -467,6 +470,14 @@ vy_row_index_key_name(enum vy_row_index_key key)
 	return vy_row_index_key_strs[key];
 }
 
+/**
+ * Negotiation protocol's keys
+ */
+enum neg_key {
+	/** Version of an error format */
+	ERROR_FORMAT_VERSION
+};
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index c7bd016..5321e87 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -417,6 +417,39 @@ netbox_encode_upsert(lua_State *L)
 }
 
 static int
+netbox_encode_negotiation(lua_State *L)
+{
+	if (lua_gettop(L) < 3)
+		return luaL_error(L, "Usage: netbox.encode_negotiation(ibuf, sync, "
+				     "opts)");
+
+
+	/* Check opts is table and parse it */
+	if (lua_istable(L, 3) == 0) {
+		return luaL_error(L, "Expected opts is table");
+	}
+
+	lua_getfield(L, 3, "error_format_ver");
+
+	int err_format_ver = ERR_FORMAT_DEF;
+	if (lua_isnumber(L, -1)) {
+		err_format_ver = lua_tonumber(L, -1);
+	}
+
+	struct mpstream stream;
+	size_t svp = netbox_prepare_request(L, &stream, IPROTO_NEGOTIATION);
+	netbox_encode_request(&stream, svp);
+
+	mpstream_encode_map(&stream, 1);
+	mpstream_encode_uint(&stream, ERROR_FORMAT_VERSION);
+	mpstream_encode_uint(&stream, err_format_ver);
+
+	netbox_encode_request(&stream, svp);
+
+	return 0;
+}
+
+static int
 netbox_decode_greeting(lua_State *L)
 {
 	struct greeting greeting;
@@ -901,6 +934,7 @@ luaopen_net_box(struct lua_State *L)
 		{ "encode_execute", netbox_encode_execute},
 		{ "encode_prepare", netbox_encode_prepare},
 		{ "encode_auth",    netbox_encode_auth },
+		{ "encode_negotiation", netbox_encode_negotiation },
 		{ "decode_greeting",netbox_decode_greeting },
 		{ "communicate",    netbox_communicate },
 		{ "decode_select",  netbox_decode_select },
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3f611c0..7fdad64 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -26,6 +26,7 @@ local check_primary_index = box.internal.check_primary_index
 local communicate     = internal.communicate
 local encode_auth     = internal.encode_auth
 local encode_select   = internal.encode_select
+local encode_negotiation = internal.encode_negotiation
 local decode_greeting = internal.decode_greeting
 
 local TIMEOUT_INFINITY = 500 * 365 * 86400
@@ -38,12 +39,13 @@ local IPROTO_STATUS_KEY    = 0x00
 local IPROTO_ERRNO_MASK    = 0x7FFF
 local IPROTO_SYNC_KEY      = 0x01
 local IPROTO_SCHEMA_VERSION_KEY = 0x05
-local IPROTO_METADATA_KEY = 0x32
-local IPROTO_SQL_INFO_KEY = 0x42
+local IPROTO_METADATA_KEY  = 0x32
+local IPROTO_SQL_INFO_KEY  = 0x42
 local SQL_INFO_ROW_COUNT_KEY = 0
 local IPROTO_FIELD_NAME_KEY = 0
 local IPROTO_DATA_KEY      = 0x30
 local IPROTO_ERROR_KEY     = 0x31
+local IPROTO_NEG_PARAM     = 0x35
 local IPROTO_GREETING_SIZE = 128
 local IPROTO_CHUNK_KEY     = 128
 local IPROTO_OK_KEY        = 0
@@ -57,6 +59,11 @@ local E_PROC_LUA             = box.error.PROC_LUA
 -- utility tables
 local is_final_state         = {closed = 1, error = 1}
 
+-- negotiations keys
+local neg_keys = {
+    ERROR_FORMAT_VERSION = 0
+}
+
 local function decode_nil(raw_data, raw_data_end)
     return nil, raw_data_end
 end
@@ -83,6 +90,10 @@ local function decode_push(raw_data)
     local response, raw_end = decode(raw_data)
     return response[IPROTO_DATA_KEY][1], raw_end
 end
+local function decode_negotiation(raw_data)
+    local response, raw_end = decode(raw_data)
+    return response[IPROTO_NEG_PARAM], raw_end
+end
 
 local function version_id(major, minor, patch)
     return bit.bor(bit.lshift(major, 16), bit.lshift(minor, 8), patch)
@@ -110,6 +121,7 @@ local method_encoder = {
     min     = internal.encode_select,
     max     = internal.encode_select,
     count   = internal.encode_call,
+    negotiation = internal.encode_negotiation,
     -- inject raw data into connection, used by console and tests
     inject = function(buf, id, bytes)
         local ptr = buf:reserve(#bytes)
@@ -138,9 +150,12 @@ local method_decoder = {
     count   = decode_count,
     inject  = decode_data,
     push    = decode_push,
+    negotiation = decode_negotiation
 }
 
-local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
+local function next_id(id)
+    return band(id + 1, 0x7FFFFFFF)
+end
 
 --
 -- Connect to a remote server, do handshake.
@@ -435,7 +450,9 @@ local function create_transport(host, port, user, password, callback,
     local protocol_sm
 
     local function start()
-        if state ~= 'initial' then return not is_final_state[state] end
+        if state ~= 'initial' then
+            return not is_final_state[state]
+        end
         fiber.create(function()
             local ok, err, timeout
             worker_fiber = fiber_self()
@@ -804,7 +821,9 @@ local function create_transport(host, port, user, password, callback,
 
     iproto_sm = function(schema_version)
         local err, hdr, body_rpos, body_end = send_and_recv_iproto()
-        if err then return error_sm(err, hdr) end
+        if err then
+            return error_sm(err, hdr)
+        end
         dispatch_response_iproto(hdr, body_rpos, body_end)
         local status = hdr[IPROTO_STATUS_KEY]
         local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY]
@@ -821,7 +840,10 @@ local function create_transport(host, port, user, password, callback,
     end
 
     error_sm = function(err, msg)
-        if connection then connection:close(); connection = nil end
+        if connection then
+            connection:close()
+            connection = nil
+        end
         send_buf:recycle()
         recv_buf:recycle()
         if state ~= 'closed' then
@@ -924,8 +946,11 @@ local console_mt = {
 
 local space_metatable, index_metatable
 
+
+
 local function new_sm(host, port, opts, connection, greeting)
-    local user, password = opts.user, opts.password; opts.password = nil
+    local user, password = opts.user, opts.password
+    opts.password = nil
     local last_reconnect_error
     local remote = {host = host, port = port, opts = opts, state = 'initial'}
     local function callback(what, ...)
@@ -1004,6 +1029,24 @@ local function new_sm(host, port, opts, connection, greeting)
     if opts.wait_connected ~= false then
         remote._transport.wait_state('active', tonumber(opts.wait_connected))
     end
+
+    -- Negotiation if needed
+    if opts.error_extended then
+        local peer_has_negotiation = version_at_least(remote.peer_version_id, 2, 4, 1)
+        if not peer_has_negotiation then
+            box.error(box.error.PROC_LUA, "Negotiations failed")
+        end
+        local neg_req = {error_format_ver = 1}
+        local neg_resp = remote:_request("negotiation", nil, nil, neg_req)
+        if neg_resp[neg_keys.ERROR_FORMAT_VERSION] ~= 1 then
+            remote._transport.stop()
+            box.error(box.error.PROC_LUA, "Negotiations failed")
+        end
+
+        remote._error_extended = true
+    end
+    -- Negotiation end
+
     return remote
 end
 
diff --git a/src/box/session.cc b/src/box/session.cc
index 8813182..c0224ca 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -37,6 +37,7 @@
 #include "error.h"
 #include "tt_static.h"
 #include "sql_stmt_cache.h"
+#include "iproto_constants.h"
 
 const char *session_type_strs[] = {
 	"background",
@@ -144,6 +145,9 @@ session_create(enum session_type type)
 	session->sql_default_engine = SQL_STORAGE_ENGINE_MEMTX;
 	session->sql_stmts = NULL;
 
+	/* Set default negotiation parameters */
+	session->neg_param.err_format_ver = ERR_FORMAT_DEF;
+
 	/* For on_connect triggers. */
 	credentials_create(&session->credentials, guest_user);
 	struct mh_i64ptr_node_t node;
@@ -373,3 +377,11 @@ generic_session_sync(struct session *session)
 	(void) session;
 	return 0;
 }
+
+int
+session_update_neg_parameters(struct session *session,
+			      const struct negotiation_params *params)
+{
+	session->neg_param.err_format_ver = params->err_format_ver;
+	return 0;
+}
diff --git a/src/box/session.h b/src/box/session.h
index 6dfc7cb..3b4dfb0 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -81,6 +81,24 @@ union session_meta {
 };
 
 /**
+ * An error transmission formats
+ */
+enum error_formats {
+	/** Default(old) format */
+	ERR_FORMAT_DEF,
+	/** Extended format */
+	ERR_FORMAT_EX
+};
+
+/**
+ * Parameters which may be changed at negotiation phase of session
+*/
+struct negotiation_params {
+	/** Version of a format for an error transmission */
+	uint8_t err_format_ver;
+};
+
+/**
  * Abstraction of a single user session:
  * for now, only provides accounting of established
  * sessions and on-connect/on-disconnect event
@@ -110,6 +128,8 @@ struct session {
 	struct credentials credentials;
 	/** Trigger for fiber on_stop to cleanup created on-demand session */
 	struct trigger fiber_on_stop;
+	/** Negotiation parameters */
+	struct negotiation_params neg_param;
 };
 
 struct session_vtab {
@@ -364,6 +384,13 @@ generic_session_fd(struct session *session);
 int64_t
 generic_session_sync(struct session *session);
 
+/**
+ * Update negatiation parameters of the session
+*/
+int
+session_update_neg_parameters(struct session *session,
+			      const struct negotiation_params *params);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 5e3cb07..90dffea 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -479,6 +479,42 @@ iproto_reply_vote(struct obuf *out, const struct ballot *ballot,
 }
 
 int
+iproto_reply_negotiation(struct obuf *out,
+			 const struct negotiation_params *neg_param,
+			 uint64_t sync,
+			 uint32_t schema_version)
+{
+	char *buf = NULL;
+	uint32_t buf_size = IPROTO_HEADER_LEN + mp_sizeof_map(1)
+		+ mp_sizeof_uint(IPROTO_NEG_PARAM) + mp_sizeof_map(1)
+		+ mp_sizeof_uint(ERROR_FORMAT_VERSION)
+		+ mp_sizeof_uint(neg_param->err_format_ver);
+
+	buf = obuf_reserve(out, buf_size);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, buf_size,
+			 "obuf_alloc", "buf");
+		return -1;
+	};
+
+	char *data = buf + IPROTO_HEADER_LEN;
+	data = mp_encode_map(data, 1);
+	data = mp_encode_uint(data, IPROTO_NEG_PARAM);
+	data = mp_encode_map(data, 1);
+	data = mp_encode_uint(data, ERROR_FORMAT_VERSION);
+	data = mp_encode_uint(data, neg_param->err_format_ver);
+	assert(data == buf + buf_size);
+
+	iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
+			     buf_size - IPROTO_HEADER_LEN);
+
+	char *ptr = obuf_alloc(out, buf_size);
+	(void) ptr;
+	assert(ptr == buf);
+	return 0;
+}
+
+int
 iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
 		   uint32_t schema_version)
 {
@@ -1518,3 +1554,54 @@ greeting_decode(const char *greetingbuf, struct greeting *greeting)
 
 	return 0;
 }
+
+int
+xrow_decode_negotiation(const struct xrow_header *row,
+			struct negotiation_params *request)
+{
+	if (row->bodycnt == 0) {
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "missing request body");
+		return -1;
+	}
+
+	assert(row->bodycnt == 1);
+	const char *data = (const char *) row->body[0].iov_base;
+
+	const char *end = data + row->body[0].iov_len;
+	assert((end - data) > 0);
+
+	if (mp_typeof(*data) != MP_MAP || mp_check_map(data, end) > 0) {
+error:
+		xrow_on_decode_err(row->body[0].iov_base,
+				   end, ER_INVALID_MSGPACK,
+				   "packet body");
+		return -1;
+	}
+
+	uint32_t map_size = mp_decode_map(&data);
+	for (uint32_t i = 0; i < map_size; ++i) {
+		if ((end - data) < 1 || mp_typeof(*data) != MP_UINT)
+			goto error;
+
+		uint64_t key = mp_decode_uint(&data);
+
+		switch (key) {
+		case ERROR_FORMAT_VERSION:
+			if (mp_typeof(*data) != MP_UINT)
+				goto error;
+			request->err_format_ver = mp_decode_uint(&data);
+			break;
+		default:
+			/* unknown key */
+			continue;
+		}
+	}
+	if (data != end) {
+		xrow_on_decode_err(row->body[0].iov_base, end,
+				   ER_INVALID_MSGPACK, "packet end");
+		return -1;
+	}
+
+	return 0;
+}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 2a0a9c8..bbd0496 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -37,6 +37,7 @@
 
 #include "uuid/tt_uuid.h"
 #include "diag.h"
+#include "session.h"
 #include "vclock.h"
 
 #if defined(__cplusplus)
@@ -177,6 +178,17 @@ struct request {
 };
 
 /**
+ * Decode negotiation request
+ * @param row
+ * @param request
+ * @retval -1 on error, see diag
+ * @retval 0 success
+ */
+int
+xrow_decode_negotiation(const struct xrow_header *row,
+			struct negotiation_params *request);
+
+/**
  * Create a JSON-like string representation of a request. */
 const char *
 request_str(const struct request *request);
@@ -566,6 +578,22 @@ int
 iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
 		   uint32_t schema_version);
 
+/**
+ * Write a negotiation reply packet to output buffer.
+ * @param out Buffer to write to.
+ * @param neg_param Current negotiation parameters.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ *
+ * @retval  0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_reply_negotiation(struct obuf *out,
+			 const struct negotiation_params *neg_param,
+			 uint64_t sync,
+			 uint32_t schema_version);
+
 /** EXECUTE/PREPARE request. */
 struct sql_request {
 	/** SQL statement text. */
@@ -936,6 +964,16 @@ iproto_reply_vote_xc(struct obuf *out, const struct ballot *ballot,
 		diag_raise();
 }
 
+/** @copydoc iproto_reply_negotiation. */
+static inline void
+iproto_reply_negotiation_xc(struct obuf *out,
+			    const struct negotiation_params *neg_param,
+			    uint64_t sync, uint32_t schema_version)
+{
+	if (iproto_reply_negotiation(out, neg_param, sync, schema_version) != 0)
+		diag_raise();
+}
+
 #endif
 
 #endif /* TARANTOOL_XROW_H_INCLUDED */
-- 
2.7.4

  parent reply	other threads:[~2020-03-24 12:46 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-03-24 12:45 [Tarantool-patches] [PATCH 0/6] Extending error functionality Leonid Vasiliev
2020-03-24 12:45 ` [Tarantool-patches] [PATCH 1/6] error: Add a Lua backtrace to error Leonid Vasiliev
2020-04-05 22:14   ` Vladislav Shpilevoy
2020-04-08 13:56     ` Igor Munkin
2020-03-24 12:46 ` [Tarantool-patches] [PATCH 2/6] error: Add the custom error type Leonid Vasiliev
2020-04-05 22:14   ` Vladislav Shpilevoy
2020-03-24 12:46 ` Leonid Vasiliev [this message]
2020-03-24 20:02   ` [Tarantool-patches] [PATCH 3/6] iproto: Add negotiation phase Konstantin Osipov
2020-03-25  7:35     ` lvasiliev
2020-03-25  8:42       ` Konstantin Osipov
2020-03-25 10:56         ` Eugene Leonovich
2020-03-25 11:13           ` Konstantin Osipov
2020-03-26 11:37           ` lvasiliev
2020-03-26 11:18         ` lvasiliev
2020-03-26 12:16           ` Konstantin Osipov
2020-03-26 12:54             ` Kirill Yukhin
2020-03-26 13:19               ` Konstantin Osipov
2020-03-26 13:31                 ` Konstantin Osipov
2020-03-26 21:13       ` Alexander Turenko
2020-03-26 21:53         ` Alexander Turenko
2020-03-27  8:28         ` Konstantin Osipov
2020-03-26 23:35       ` Alexander Turenko
2020-03-27  8:39         ` Konstantin Osipov
2020-03-24 12:46 ` [Tarantool-patches] [PATCH 4/6] error: Add extended error transfer format Leonid Vasiliev
2020-03-24 12:46 ` [Tarantool-patches] [PATCH 5/6] error: Add test for extended error Leonid Vasiliev
2020-03-24 12:46 ` [Tarantool-patches] [PATCH 6/6] error: Transmit an error through IPROTO_OK as object Leonid Vasiliev
2020-03-27 23:11 ` [Tarantool-patches] [PATCH 0/6] Extending error functionality lvasiliev
2020-03-28 13:54   ` Alexander Turenko
2020-03-30 10:48     ` lvasiliev
2020-04-01 15:35 ` Alexander Turenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=7982fc7b062b2424689a990de1f76ca2ff0e4f50.1585053743.git.lvasiliev@tarantool.org \
    --to=lvasiliev@tarantool.org \
    --cc=alexander.turenko@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 3/6] iproto: Add negotiation phase' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox