[Tarantool-patches] [PATCH 3/3] tuple: rework updates to improve code extendibility
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Nov 1 02:43:07 MSK 2019
Before the patch update was implemented as a set of operations
applicable for arrays only. It was ok until field names and JSON
paths appearance, because tuple is array on the top level.
But now there are four reasons to allow more complex updates of
tuple field internals via JSON paths:
* tuple field access by JSON path is allowed so for consistency
JSON paths should be allowed in updates as well;
* JSON indexes are supported. JSON update should be able to
change an indexed field without rewriting half of a tuple;
* Tarantool is going to support documents in storage so JSON
path updates is one more step forward;
* JSON updates are going to be faster than get + in-memory
Lua/connector update + replace (or update of a whole tuple
field).
The patch prepares current update code to be extended by updates
of map, so named isolated 'bar' updates, and multilevel updates.
The concept is to build a tree of update objects. Each updates a
scalar terminal value (leaf of that tree), or a complex object
like array or map. In the latter case these objects contain >= 2
children updates.
This organization allows to split update implementation into
several independent files-modules for each update type. Next
commits will introduce them one by one.
Needed for #1261
---
src/box/CMakeLists.txt | 2 +
src/box/xrow_update/xrow_update.c | 1120 +----------------------
src/box/xrow_update/xrow_update_array.c | 302 ++++++
src/box/xrow_update/xrow_update_field.c | 665 ++++++++++++++
src/box/xrow_update/xrow_update_field.h | 442 +++++++++
test/box/tuple.result | 4 +-
6 files changed, 1463 insertions(+), 1072 deletions(-)
create mode 100644 src/box/xrow_update/xrow_update_array.c
create mode 100644 src/box/xrow_update/xrow_update_field.c
create mode 100644 src/box/xrow_update/xrow_update_field.h
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 351ec69f5..e451f7f19 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -41,6 +41,8 @@ add_library(tuple STATIC
field_map.c
tuple_format.c
xrow_update/xrow_update.c
+ xrow_update/xrow_update_field.c
+ xrow_update/xrow_update_array.c
tuple_compare.cc
tuple_extract_key.cc
tuple_hash.cc
diff --git a/src/box/xrow_update/xrow_update.c b/src/box/xrow_update/xrow_update.c
index 099f56785..e558b1892 100644
--- a/src/box/xrow_update/xrow_update.c
+++ b/src/box/xrow_update/xrow_update.c
@@ -29,37 +29,29 @@
* SUCH DAMAGE.
*/
#include "xrow_update.h"
-#include <stdbool.h>
-
-#include "say.h"
#include "box/error.h"
#include "diag.h"
-#include "trivia/util.h"
-#include "third_party/queue.h"
#include <msgpuck/msgpuck.h>
-#include <bit/int96.h>
#include "box/column_mask.h"
-#include "mp_extension_types.h"
-#include "mp_decimal.h"
#include "fiber.h"
-#include "box/tuple_dictionary.h"
-#include "box/tuple_format.h"
-#include "tt_static.h"
+#include "xrow_update_field.h"
-/** UPDATE request implementation.
- * UPDATE request is represented by a sequence of operations, each
+/**
+ * UPDATE is represented by a sequence of operations, each
* working with a single field. There also are operations which
* add or remove fields. Only one operation on the same field
* is allowed.
+ * Field is any part of a tuple: top-level array's field, leaf of
+ * a complex tuple with lots of maps and arrays inside, a whole
+ * map/array inside a tuple.
*
* Supported field change operations are: SET, ADD, SUBTRACT;
* bitwise AND, XOR and OR; SPLICE.
+ * Supported tuple change operations are: SET, DELETE, INSERT.
*
- * Supported tuple change operations are: SET, DELETE, INSERT,
- * PUSH and POP.
* If the number of fields in a tuple is altered by an operation,
* field index of all following operations is evaluated against the
- * new tuple.
+ * new tuple. It applies to internal tuple's arrays too.
*
* Despite the allowed complexity, a typical use case for UPDATE
* is when the operation count is much less than field count in
@@ -68,20 +60,28 @@
* With the common case in mind, UPDATE tries to minimize
* the amount of unnecessary temporary tuple copies.
*
- * First, operations are parsed and initialized. Then, the
- * resulting tuple length is calculated. A new tuple is allocated.
- * Finally, operations are applied sequentially, each copying data
- * from the old tuple to the new tuple.
+ * First, operations are parsed and initialized. Then they are
+ * applied one by one to a tuple. Each operation may change an
+ * already located field in a tuple, or may split parent of the
+ * field into subtrees. After all operations are applied, the
+ * result is a tree of updated, new, and non-changed fields.
+ * The tree needs to be flattened into MessagePack format. For
+ * that a resulting tuple length is calculated. Memory for the new
+ * tuple is allocated in one contiguous chunk. Then the update
+ * tree is stored into the chunk as a result tuple.
*
+ * Note, that the result tree didn't allocate anything until a
+ * result was stored. It was referencing old tuple's memory.
* With this approach, cost of UPDATE is proportional to O(tuple
* length) + O(C * log C), where C is the number of operations in
* the request, and data is copied from the old tuple to the new
* one only once.
*
- * As long as INSERT, DELETE, PUSH and POP change the relative
- * field order, an auxiliary data structure is necessary to look
- * up fields in the "old" tuple by field number. Such field
- * index is built on demand, using "rope" data structure.
+ * As long as INSERT and DELETE change the relative field order in
+ * arrays and maps, these fields are represented as special
+ * structures optimized for updates to provide fast search and
+ * don't realloc anything. It is 'rope' data structure for array,
+ * and a simpler key-value list sorted by update time for map.
*
* A rope is a binary tree designed to store long strings built
* from pieces. Each tree node points to a substring of a large
@@ -97,1044 +97,28 @@
* deleted one.
*/
-static inline void *
-xrow_update_alloc(struct region *region, size_t size)
-{
- void *ptr = region_aligned_alloc(region, size, alignof(uint64_t));
- if (ptr == NULL)
- diag_set(OutOfMemory, size, "region_aligned_alloc",
- "update internals");
- return ptr;
-}
-
/** Update internal state */
struct xrow_update
{
- struct xrow_update_rope *rope;
+ /** Operations array. */
struct xrow_update_op *ops;
+ /** Length of ops. */
uint32_t op_count;
- int index_base; /* 0 for C and 1 for Lua */
- /** A bitmask of all columns modified by this update */
- uint64_t column_mask;
-};
-
-/** Argument of SET (and INSERT) operation. */
-struct xrow_update_arg_set {
- uint32_t length;
- const char *value;
-};
-
-/** Argument of DELETE operation. */
-struct xrow_update_arg_del {
- uint32_t count;
-};
-
-/**
- * MsgPack format code of an arithmetic argument or result.
- * MsgPack codes are not used to simplify type calculation.
- */
-enum xrow_update_arith_type {
- XUPDATE_TYPE_DECIMAL = 0, /* MP_EXT + MP_DECIMAL */
- XUPDATE_TYPE_DOUBLE = 1, /* MP_DOUBLE */
- XUPDATE_TYPE_FLOAT = 2, /* MP_FLOAT */
- XUPDATE_TYPE_INT = 3 /* MP_INT/MP_UINT */
-};
-
-/**
- * Argument (left and right) and result of ADD, SUBTRACT.
- *
- * To perform an arithmetic operation, update first loads
- * left and right arguments into corresponding value objects,
- * then performs arithmetics on types of arguments, thus
- * calculating the type of the result, and then
- * performs the requested operation according to the calculated
- * type rules.
- *
- * The rules are as follows:
- * - when one of the argument types is double, the result is
- * double
- * - when one of the argument types is float, the result is
- * float
- * - for integer arguments, the result type code depends on
- * the range in which falls the result of the operation.
- * If the result is in negative range, it's MP_INT, otherwise
- * it's MP_UINT. If the result is out of bounds of (-2^63,
- * 2^64), and exception is raised for overflow.
- */
-struct xrow_update_arg_arith {
- enum xrow_update_arith_type type;
- union {
- double dbl;
- float flt;
- struct int96_num int96;
- decimal_t dec;
- };
-};
-
-/** Argument of AND, XOR, OR operations. */
-struct xrow_update_arg_bit {
- uint64_t val;
-};
-
-/** Argument of SPLICE. */
-struct xrow_update_arg_splice {
- int32_t offset; /** splice position */
- int32_t cut_length; /** cut this many bytes. */
- const char *paste; /** paste what? */
- uint32_t paste_length; /** paste this many bytes. */
-
- /** Offset of the tail in the old field */
- int32_t tail_offset;
- /** Size of the tail. */
- int32_t tail_length;
-};
-
-union xrow_update_arg {
- struct xrow_update_arg_set set;
- struct xrow_update_arg_del del;
- struct xrow_update_arg_arith arith;
- struct xrow_update_arg_bit bit;
- struct xrow_update_arg_splice splice;
-};
-
-struct xrow_update_field;
-struct xrow_update_op;
-
-static struct xrow_update_field *
-xrow_update_field_split(struct region *region, struct xrow_update_field *data,
- size_t size, size_t offset);
-
-#define ROPE_SPLIT_F xrow_update_field_split
-#define ROPE_ALLOC_F xrow_update_alloc
-#define rope_data_t struct xrow_update_field *
-#define rope_ctx_t struct region *
-#define rope_name xrow_update
-
-#include "salad/rope.h"
-
-typedef int (*xrow_update_op_do_f)(struct xrow_update *update,
- struct xrow_update_op *op);
-typedef int (*xrow_update_op_read_arg_f)(int index_base,
- struct xrow_update_op *op,
- const char **expr);
-typedef void (*xrow_update_op_store_f)(union xrow_update_arg *arg,
- const char *in, char *out);
-
-/** A set of functions and properties to initialize and do an op. */
-struct xrow_update_op_meta {
- xrow_update_op_read_arg_f read_arg;
- xrow_update_op_do_f do_op;
- xrow_update_op_store_f store;
- /* Argument count */
- uint32_t args;
-};
-
-/** A single UPDATE operation. */
-struct xrow_update_op {
- const struct xrow_update_op_meta *meta;
- union xrow_update_arg arg;
- /* Subject field no. */
- int32_t field_no;
- uint32_t new_field_len;
- uint8_t opcode;
-};
-
-static inline const char *
-xrow_update_op_field_str(const struct xrow_update_op *op)
-{
- if (op->field_no >= 0)
- return tt_sprintf("%d", op->field_no + TUPLE_INDEX_BASE);
- else
- return tt_sprintf("%d", op->field_no);
-}
-
-static inline int
-xrow_update_err_arg_type(const struct xrow_update_op *op,
- const char *needed_type)
-{
- diag_set(ClientError, ER_UPDATE_ARG_TYPE, op->opcode,
- xrow_update_op_field_str(op), needed_type);
- return -1;
-}
-
-static inline int
-xrow_update_err_int_overflow(const struct xrow_update_op *op)
-{
- diag_set(ClientError, ER_UPDATE_INTEGER_OVERFLOW, op->opcode,
- xrow_update_op_field_str(op));
- return -1;
-}
-
-static inline int
-xrow_update_err_decimal_overflow(const struct xrow_update_op *op)
-{
- diag_set(ClientError, ER_UPDATE_DECIMAL_OVERFLOW, op->opcode,
- xrow_update_op_field_str(op));
- return -1;
-}
-
-static inline int
-xrow_update_err_splice_bound(const struct xrow_update_op *op)
-{
- diag_set(ClientError, ER_UPDATE_SPLICE, xrow_update_op_field_str(op),
- "offset is out of bound");
- return -1;
-}
-
-static inline int
-xrow_update_err_no_such_field(const struct xrow_update_op *op)
-{
- diag_set(ClientError, ER_NO_SUCH_FIELD_NO, op->field_no >= 0 ?
- TUPLE_INDEX_BASE + op->field_no : op->field_no);
- return -1;
-}
-
-static inline int
-xrow_update_err(const struct xrow_update_op *op, const char *reason)
-{
- diag_set(ClientError, ER_UPDATE_FIELD, xrow_update_op_field_str(op),
- reason);
- return -1;
-}
-
-static inline int
-xrow_update_err_double(const struct xrow_update_op *op)
-{
- return xrow_update_err(op, "double update of the same field");
-}
-
-/**
- * We can have more than one operation on the same field.
- * A descriptor of one changed field.
- */
-struct xrow_update_field {
- /** UPDATE operation against the first field in the range. */
- struct xrow_update_op *op;
- /** Points at start of field *data* in the old tuple. */
- const char *old;
- /** End of the old field. */
- const char *tail;
/**
- * Length of the "tail" in the old tuple from end
- * of old data to the beginning of the field in the
- * next update_field structure.
+ * Index base for MessagePack update operations. If update
+ * is from Lua, then the base is 1. Otherwise 0. That
+ * field exists because Lua uses 1-based array indexing,
+ * and Lua-to-MessagePack encoder keeps this indexing when
+ * encodes operations array. Index base allows not to
+ * re-encode each Lua update with 0-based indexes.
*/
- uint32_t tail_len;
-};
-
-static void
-xrow_update_field_init(struct xrow_update_field *field, const char *old,
- uint32_t old_len, uint32_t tail_len)
-{
- field->op = NULL;
- field->old = old;
- field->tail = old + old_len;
- field->tail_len = tail_len;
-}
-
-/* {{{ read_arg helpers */
-
-/** Read a field index or any other integer field. */
-static inline int
-xrow_update_mp_read_int32(struct xrow_update_op *op, const char **expr,
- int32_t *ret)
-{
- if (mp_read_int32(expr, ret) == 0)
- return 0;
- return xrow_update_err_arg_type(op, "an integer");
-}
-
-static inline int
-xrow_update_mp_read_uint(struct xrow_update_op *op, const char **expr,
- uint64_t *ret)
-{
- if (mp_typeof(**expr) == MP_UINT) {
- *ret = mp_decode_uint(expr);
- return 0;
- }
- return xrow_update_err_arg_type(op, "a positive integer");
-}
-
-/**
- * Load an argument of an arithmetic operation either from tuple
- * or from the UPDATE command.
- */
-static inline int
-xrow_mp_read_arg_arith(struct xrow_update_op *op, const char **expr,
- struct xrow_update_arg_arith *ret)
-{
- if (mp_typeof(**expr) == MP_UINT) {
- ret->type = XUPDATE_TYPE_INT;
- int96_set_unsigned(&ret->int96, mp_decode_uint(expr));
- } else if (mp_typeof(**expr) == MP_INT) {
- ret->type = XUPDATE_TYPE_INT;
- int96_set_signed(&ret->int96, mp_decode_int(expr));
- } else if (mp_typeof(**expr) == MP_DOUBLE) {
- ret->type = XUPDATE_TYPE_DOUBLE;
- ret->dbl = mp_decode_double(expr);
- } else if (mp_typeof(**expr) == MP_FLOAT) {
- ret->type = XUPDATE_TYPE_FLOAT;
- ret->flt = mp_decode_float(expr);
- } else if (mp_typeof(**expr) == MP_EXT) {
- int8_t ext_type;
- uint32_t len = mp_decode_extl(expr, &ext_type);
- switch (ext_type) {
- case MP_DECIMAL:
- ret->type = XUPDATE_TYPE_DECIMAL;
- decimal_unpack(expr, len, &ret->dec);
- break;
- default:
- goto err;
- }
- } else {
-err:
- return xrow_update_err_arg_type(op, "a number");
- }
- return 0;
-}
-
-static inline int
-xrow_update_mp_read_str(struct xrow_update_op *op, const char **expr,
- uint32_t *len, const char **ret)
-{
- if (mp_typeof(**expr) == MP_STR) {
- *ret = mp_decode_str(expr, len); /* value */
- return 0;
- }
- return xrow_update_err_arg_type(op, "a string");
-}
-
-/* }}} read_arg helpers */
-
-/* {{{ read_arg */
-
-static int
-xrow_update_read_arg_set(int index_base, struct xrow_update_op *op,
- const char **expr)
-{
- (void)index_base;
- op->arg.set.value = *expr;
- mp_next(expr);
- op->arg.set.length = (uint32_t) (*expr - op->arg.set.value);
- return 0;
-}
-
-static int
-xrow_update_read_arg_insert(int index_base, struct xrow_update_op *op,
- const char **expr)
-{
- return xrow_update_read_arg_set(index_base, op, expr);
-}
-
-static int
-xrow_update_read_arg_delete(int index_base, struct xrow_update_op *op,
- const char **expr)
-{
- (void) index_base;
- if (mp_typeof(**expr) == MP_UINT) {
- op->arg.del.count = (uint32_t) mp_decode_uint(expr);
- if (op->arg.del.count != 0)
- return 0;
- return xrow_update_err(op, "cannot delete 0 fields");
- }
- return xrow_update_err_arg_type(op, "a positive integer");
-}
-
-static int
-xrow_update_read_arg_arith(int index_base, struct xrow_update_op *op,
- const char **expr)
-{
- (void) index_base;
- return xrow_mp_read_arg_arith(op, expr, &op->arg.arith);
-}
-
-static int
-xrow_update_read_arg_bit(int index_base, struct xrow_update_op *op,
- const char **expr)
-{
- (void) index_base;
- struct xrow_update_arg_bit *arg = &op->arg.bit;
- return xrow_update_mp_read_uint(op, expr, &arg->val);
-}
-
-static int
-xrow_update_read_arg_splice(int index_base, struct xrow_update_op *op,
- const char **expr)
-{
- struct xrow_update_arg_splice *arg = &op->arg.splice;
- if (xrow_update_mp_read_int32(op, expr, &arg->offset) != 0)
- return -1;
- if (arg->offset >= 0) {
- if (arg->offset - index_base < 0)
- return xrow_update_err_splice_bound(op);
- arg->offset -= index_base;
- }
- /* cut length */
- if (xrow_update_mp_read_int32(op, expr, &arg->cut_length) != 0)
- return -1;
- /* value */
- return xrow_update_mp_read_str(op, expr, &arg->paste_length,
- &arg->paste);
-}
-
-/* }}} read_arg */
-
-/* {{{ do_op helpers */
-
-static inline int
-xrow_update_op_adjust_field_no(struct xrow_update_op *op, int32_t field_max)
-{
- if (op->field_no >= 0) {
- if (op->field_no < field_max)
- return 0;
- } else if (op->field_no + field_max >= 0) {
- op->field_no += field_max;
- return 0;
- }
- return xrow_update_err_no_such_field(op);
-}
-
-static inline double
-xrow_update_arg_arith_to_double(struct xrow_update_arg_arith arg)
-{
- if (arg.type == XUPDATE_TYPE_DOUBLE) {
- return arg.dbl;
- } else if (arg.type == XUPDATE_TYPE_FLOAT) {
- return arg.flt;
- } else {
- assert(arg.type == XUPDATE_TYPE_INT);
- if (int96_is_uint64(&arg.int96)) {
- return int96_extract_uint64(&arg.int96);
- } else {
- assert(int96_is_neg_int64(&arg.int96));
- return int96_extract_neg_int64(&arg.int96);
- }
- }
-}
-
-static inline decimal_t *
-xrow_update_arg_arith_to_decimal(struct xrow_update_arg_arith arg,
- decimal_t *dec)
-{
- decimal_t *ret;
- if (arg.type == XUPDATE_TYPE_DECIMAL) {
- *dec = arg.dec;
- return dec;
- } else if (arg.type == XUPDATE_TYPE_DOUBLE) {
- ret = decimal_from_double(dec, arg.dbl);
- } else if (arg.type == XUPDATE_TYPE_FLOAT) {
- ret = decimal_from_double(dec, arg.flt);
- } else {
- assert(arg.type == XUPDATE_TYPE_INT);
- if (int96_is_uint64(&arg.int96)) {
- uint64_t val = int96_extract_uint64(&arg.int96);
- ret = decimal_from_uint64(dec, val);
- } else {
- assert(int96_is_neg_int64(&arg.int96));
- int64_t val = int96_extract_neg_int64(&arg.int96);
- ret = decimal_from_int64(dec, val);
- }
- }
-
- return ret;
-}
-
-/** Return the MsgPack size of an arithmetic operation result. */
-static inline uint32_t
-xrow_update_arg_arith_sizeof(struct xrow_update_arg_arith arg)
-{
- if (arg.type == XUPDATE_TYPE_INT) {
- if (int96_is_uint64(&arg.int96)) {
- uint64_t val = int96_extract_uint64(&arg.int96);
- return mp_sizeof_uint(val);
- } else {
- int64_t val = int96_extract_neg_int64(&arg.int96);
- return mp_sizeof_int(val);
- }
- } else if (arg.type == XUPDATE_TYPE_DOUBLE) {
- return mp_sizeof_double(arg.dbl);
- } else if (arg.type == XUPDATE_TYPE_FLOAT) {
- return mp_sizeof_float(arg.flt);
- } else {
- assert(arg.type == XUPDATE_TYPE_DECIMAL);
- return mp_sizeof_decimal(&arg.dec);
- }
-}
-
-static inline int
-xrow_update_arith_make(struct xrow_update_op *op,
- struct xrow_update_arg_arith arg,
- struct xrow_update_arg_arith *ret)
-{
- struct xrow_update_arg_arith arg1 = arg;
- struct xrow_update_arg_arith arg2 = op->arg.arith;
- enum xrow_update_arith_type lowest_type = arg1.type;
- char opcode = op->opcode;
- if (arg1.type > arg2.type)
- lowest_type = arg2.type;
-
- if (lowest_type == XUPDATE_TYPE_INT) {
- switch(opcode) {
- case '+':
- int96_add(&arg1.int96, &arg2.int96);
- break;
- case '-':
- int96_invert(&arg2.int96);
- int96_add(&arg1.int96, &arg2.int96);
- break;
- default:
- unreachable(); /* checked by update_read_ops */
- break;
- }
- if (!int96_is_uint64(&arg1.int96) &&
- !int96_is_neg_int64(&arg1.int96))
- return xrow_update_err_int_overflow(op);
- *ret = arg1;
- return 0;
- } else if (lowest_type >= XUPDATE_TYPE_DOUBLE) {
- /* At least one of operands is double or float */
- double a = xrow_update_arg_arith_to_double(arg1);
- double b = xrow_update_arg_arith_to_double(arg2);
- double c;
- switch(opcode) {
- case '+': c = a + b; break;
- case '-': c = a - b; break;
- default:
- unreachable();
- break;
- }
- if (lowest_type == XUPDATE_TYPE_DOUBLE) {
- /* result is DOUBLE */
- ret->type = XUPDATE_TYPE_DOUBLE;
- ret->dbl = c;
- } else {
- /* result is FLOAT */
- assert(lowest_type == XUPDATE_TYPE_FLOAT);
- ret->type = XUPDATE_TYPE_FLOAT;
- ret->flt = (float)c;
- }
- } else {
- /* At least one of the operands is decimal. */
- decimal_t a, b, c;
- if (! xrow_update_arg_arith_to_decimal(arg1, &a) ||
- ! xrow_update_arg_arith_to_decimal(arg2, &b)) {
- return xrow_update_err_arg_type(op, "a number "\
- "convertible to "\
- "decimal.");
- }
- switch(opcode) {
- case '+':
- if (decimal_add(&c, &a, &b) == NULL)
- return xrow_update_err_decimal_overflow(op);
- break;
- case '-':
- if (decimal_sub(&c, &a, &b) == NULL)
- return xrow_update_err_decimal_overflow(op);
- break;
- default:
- unreachable();
- break;
- }
- ret->type = XUPDATE_TYPE_DECIMAL;
- ret->dec = c;
- }
- return 0;
-}
-
-/* }}} do_op helpers */
-
-/* {{{ do_op */
-
-static int
-xrow_update_op_do_insert(struct xrow_update *update, struct xrow_update_op *op)
-{
- uint32_t size = xrow_update_rope_size(update->rope);
- if (xrow_update_op_adjust_field_no(op, size + 1) != 0)
- return -1;
- struct xrow_update_field *field = (struct xrow_update_field *)
- xrow_update_alloc(&fiber()->gc, sizeof(*field));
- if (field == NULL)
- return -1;
- xrow_update_field_init(field, op->arg.set.value, op->arg.set.length, 0);
- return xrow_update_rope_insert(update->rope, op->field_no, field, 1);
-}
-
-static int
-xrow_update_op_do_set(struct xrow_update *update, struct xrow_update_op *op)
-{
- /* intepret '=' for n +1 field as insert */
- if (op->field_no == (int32_t) xrow_update_rope_size(update->rope))
- return xrow_update_op_do_insert(update, op);
-
- uint32_t size = xrow_update_rope_size(update->rope);
- if (xrow_update_op_adjust_field_no(op, size) != 0)
- return -1;
- struct xrow_update_field *field =
- xrow_update_rope_extract(update->rope, op->field_no);
- if (field == NULL)
- return -1;
- /* Ignore the previous op, if any. */
- field->op = op;
- op->new_field_len = op->arg.set.length;
- return 0;
-}
-
-static int
-xrow_update_op_do_delete(struct xrow_update *update, struct xrow_update_op *op)
-{
- uint32_t size = xrow_update_rope_size(update->rope);
- if (xrow_update_op_adjust_field_no(op, size) != 0)
- return -1;
- uint32_t delete_count = op->arg.del.count;
-
- if ((uint64_t) op->field_no + delete_count > size)
- delete_count = size - op->field_no;
-
- assert(delete_count > 0);
- for (uint32_t u = 0; u < delete_count; u++)
- xrow_update_rope_erase(update->rope, op->field_no);
- return 0;
-}
-
-static int
-xrow_update_op_do_arith(struct xrow_update *update, struct xrow_update_op *op)
-{
- uint32_t size = xrow_update_rope_size(update->rope);
- if (xrow_update_op_adjust_field_no(op, size) != 0)
- return -1;
-
- struct xrow_update_field *field =
- xrow_update_rope_extract(update->rope, op->field_no);
- if (field == NULL)
- return -1;
- if (field->op != NULL)
- return xrow_update_err_double(op);
- const char *old = field->old;
- struct xrow_update_arg_arith left_arg;
- if (xrow_mp_read_arg_arith(op, &old, &left_arg) != 0)
- return -1;
-
- if (xrow_update_arith_make(op, left_arg, &op->arg.arith) != 0)
- return -1;
- field->op = op;
- op->new_field_len = xrow_update_arg_arith_sizeof(op->arg.arith);
- return 0;
-}
-
-static int
-xrow_update_op_do_bit(struct xrow_update *update, struct xrow_update_op *op)
-{
- uint32_t size = xrow_update_rope_size(update->rope);
- if (xrow_update_op_adjust_field_no(op, size) != 0)
- return -1;
- struct xrow_update_field *field =
- xrow_update_rope_extract(update->rope, op->field_no);
- if (field == NULL)
- return -1;
- struct xrow_update_arg_bit *arg = &op->arg.bit;
- if (field->op != NULL)
- return xrow_update_err_double(op);
- const char *old = field->old;
- uint64_t val = 0;
- if (xrow_update_mp_read_uint(op, &old, &val) != 0)
- return -1;
- switch (op->opcode) {
- case '&':
- arg->val &= val;
- break;
- case '^':
- arg->val ^= val;
- break;
- case '|':
- arg->val |= val;
- break;
- default:
- unreachable(); /* checked by update_read_ops */
- }
- field->op = op;
- op->new_field_len = mp_sizeof_uint(arg->val);
- return 0;
-}
-
-static int
-xrow_update_op_do_splice(struct xrow_update *update, struct xrow_update_op *op)
-{
- uint32_t size = xrow_update_rope_size(update->rope);
- if (xrow_update_op_adjust_field_no(op, size) != 0)
- return -1;
- struct xrow_update_field *field =
- xrow_update_rope_extract(update->rope, op->field_no);
- if (field == NULL)
- return -1;
- if (field->op != NULL)
- return xrow_update_err_double(op);
-
- struct xrow_update_arg_splice *arg = &op->arg.splice;
-
- const char *in = field->old;
- int32_t str_len = 0;
- if (xrow_update_mp_read_str(op, &in, (uint32_t *) &str_len, &in) != 0)
- return -1;
-
- if (arg->offset < 0) {
- if (-arg->offset > str_len + 1)
- return xrow_update_err_splice_bound(op);
- arg->offset = arg->offset + str_len + 1;
- } else if (arg->offset > str_len) {
- arg->offset = str_len;
- }
-
- assert(arg->offset >= 0 && arg->offset <= str_len);
-
- if (arg->cut_length < 0) {
- if (-arg->cut_length > (str_len - arg->offset))
- arg->cut_length = 0;
- else
- arg->cut_length += str_len - arg->offset;
- } else if (arg->cut_length > str_len - arg->offset) {
- arg->cut_length = str_len - arg->offset;
- }
-
- assert(arg->offset <= str_len);
-
- /* Fill tail part */
- arg->tail_offset = arg->offset + arg->cut_length;
- arg->tail_length = str_len - arg->tail_offset;
-
-
- field->op = op;
- /* Record the new field length (maximal). */
- op->new_field_len = mp_sizeof_str(arg->offset + arg->paste_length +
- arg->tail_length);
- return 0;
-}
-
-/* }}} do_op */
-
-/* {{{ store_op */
-
-static void
-xrow_update_op_store_set(struct xrow_update_arg_set *arg, const char *in,
- char *out)
-{
- (void)in;
- memcpy(out, arg->value, arg->length);
-}
-
-static void
-xrow_update_op_store_insert(struct xrow_update_arg_set *arg, const char *in,
- char *out)
-{
- (void)in;
- memcpy(out, arg->value, arg->length);
-}
-
-static void
-xrow_update_op_store_arith(struct xrow_update_arg_arith *arg, const char *in,
- char *out)
-{
- (void)in;
- if (arg->type == XUPDATE_TYPE_INT) {
- if (int96_is_uint64(&arg->int96)) {
- mp_encode_uint(out, int96_extract_uint64(&arg->int96));
- } else {
- assert(int96_is_neg_int64(&arg->int96));
- mp_encode_int(out, int96_extract_neg_int64(&arg->int96));
- }
- } else if (arg->type == XUPDATE_TYPE_DOUBLE) {
- mp_encode_double(out, arg->dbl);
- } else if (arg->type == XUPDATE_TYPE_FLOAT) {
- mp_encode_float(out, arg->flt);
- } else {
- assert (arg->type == XUPDATE_TYPE_DECIMAL);
- mp_encode_decimal(out, &arg->dec);
- }
-}
-
-static void
-xrow_update_op_store_bit(struct xrow_update_arg_bit *arg, const char *in,
- char *out)
-{
- (void)in;
- mp_encode_uint(out, arg->val);
-}
-
-static void
-xrow_update_op_store_splice(struct xrow_update_arg_splice *arg, const char *in,
- char *out)
-{
- uint32_t new_str_len = arg->offset + arg->paste_length
- + arg->tail_length;
-
- (void) mp_decode_strl(&in);
-
- out = mp_encode_strl(out, new_str_len);
- memcpy(out, in, arg->offset); /* copy field head. */
- out = out + arg->offset;
- memcpy(out, arg->paste, arg->paste_length); /* copy the paste */
- out = out + arg->paste_length;
- memcpy(out, in + arg->tail_offset, arg->tail_length); /* copy tail */
-}
-
-/* }}} store_op */
-
-static const struct xrow_update_op_meta op_set = {
- xrow_update_read_arg_set, xrow_update_op_do_set,
- (xrow_update_op_store_f) xrow_update_op_store_set, 3
-};
-static const struct xrow_update_op_meta op_insert = {
- xrow_update_read_arg_insert, xrow_update_op_do_insert,
- (xrow_update_op_store_f) xrow_update_op_store_insert, 3
-};
-static const struct xrow_update_op_meta op_arith = {
- xrow_update_read_arg_arith, xrow_update_op_do_arith,
- (xrow_update_op_store_f) xrow_update_op_store_arith, 3
-};
-static const struct xrow_update_op_meta op_bit = {
- xrow_update_read_arg_bit, xrow_update_op_do_bit,
- (xrow_update_op_store_f) xrow_update_op_store_bit, 3
-};
-static const struct xrow_update_op_meta op_splice = {
- xrow_update_read_arg_splice, xrow_update_op_do_splice,
- (xrow_update_op_store_f) xrow_update_op_store_splice, 5
-};
-static const struct xrow_update_op_meta op_delete = {
- xrow_update_read_arg_delete, xrow_update_op_do_delete,
- (xrow_update_op_store_f) NULL, 3
+ int index_base;
+ /** A bitmask of all columns modified by this update. */
+ uint64_t column_mask;
+ /** First level of update tree. It is always array. */
+ struct xrow_update_field root;
};
-/** Split a range of fields in two, allocating update_field
- * context for the new range.
- */
-static struct xrow_update_field *
-xrow_update_field_split(struct region *region, struct xrow_update_field *prev,
- size_t size, size_t offset)
-{
- (void) size;
- struct xrow_update_field *next = (struct xrow_update_field *)
- xrow_update_alloc(region, sizeof(*next));
- if (next == NULL)
- return NULL;
- assert(offset > 0 && prev->tail_len > 0);
-
- const char *field = prev->tail;
- const char *end = field + prev->tail_len;
-
- for (uint32_t i = 1; i < offset; i++) {
- mp_next(&field);
- }
-
- prev->tail_len = field - prev->tail;
- const char *f = field;
- mp_next(&f);
- uint32_t field_len = f - field;
-
- xrow_update_field_init(next, field, field_len, end - field - field_len);
- return next;
-}
-
-/**
- * We found a tuple to do the update on. Prepare a rope
- * to perform operations on.
- * @param tuple_data MessagePack array without the array header.
- * @param tuple_data_end End of the @tuple_data.
- * @param field_count Field count in @tuple_data.
- *
- * @retval 0 Success.
- * @retval -1 Error.
- */
-struct xrow_update_rope *
-xrow_update_rope_new_by_tuple(const char *tuple_data,
- const char *tuple_data_end, uint32_t field_count)
-{
- struct region *region = &fiber()->gc;
- struct xrow_update_rope *rope = xrow_update_rope_new(region);
- if (rope == NULL)
- return NULL;
- /* Initialize the rope with the old tuple. */
- struct xrow_update_field *first = (struct xrow_update_field *)
- xrow_update_alloc(region, sizeof(*first));
- if (first == NULL)
- return NULL;
- const char *field = tuple_data;
- const char *end = tuple_data_end;
- if (field == end)
- return rope;
-
- /* Add first field to rope */
- mp_next(&tuple_data);
- uint32_t field_len = tuple_data - field;
- xrow_update_field_init(first, field, field_len,
- end - field - field_len);
-
- return xrow_update_rope_append(rope, first, field_count) != 0 ?
- NULL : rope;
-}
-
-static uint32_t
-xrow_update_calc_tuple_length(struct xrow_update *update)
-{
- uint32_t res = mp_sizeof_array(xrow_update_rope_size(update->rope));
- struct xrow_update_rope_iter it;
- struct xrow_update_rope_node *node;
-
- xrow_update_rope_iter_create(&it, update->rope);
- for (node = xrow_update_rope_iter_start(&it); node;
- node = xrow_update_rope_iter_next(&it)) {
- struct xrow_update_field *field =
- xrow_update_rope_leaf_data(node);
- uint32_t field_len = (field->op ? field->op->new_field_len :
- (uint32_t)(field->tail - field->old));
- res += field_len + field->tail_len;
- }
-
- return res;
-}
-
-static uint32_t
-xrow_update_write_tuple(struct xrow_update *update, char *buffer,
- char *buffer_end)
-{
- char *new_data = buffer;
- new_data = mp_encode_array(new_data,
- xrow_update_rope_size(update->rope));
-
- (void) buffer_end;
-
- uint32_t total_field_count = 0;
-
- struct xrow_update_rope_iter it;
- struct xrow_update_rope_node *node;
-
- xrow_update_rope_iter_create(&it, update->rope);
- for (node = xrow_update_rope_iter_start(&it); node;
- node = xrow_update_rope_iter_next(&it)) {
- struct xrow_update_field *field =
- xrow_update_rope_leaf_data(node);
- uint32_t field_count = xrow_update_rope_leaf_size(node);
- const char *old_field = field->old;
- struct xrow_update_op *op = field->op;
- if (op) {
- op->meta->store(&op->arg, old_field, new_data);
- new_data += op->new_field_len;
- } else {
- uint32_t field_len = field->tail - field->old;
- memcpy(new_data, old_field, field_len);
- new_data += field_len;
- }
- /* Copy tail_len from the old tuple. */
- assert(field->tail_len == 0 || field_count > 1);
- if (field_count > 1) {
- memcpy(new_data, field->tail, field->tail_len);
- new_data += field->tail_len;
- }
- total_field_count += field_count;
- }
-
- assert(xrow_update_rope_size(update->rope) == total_field_count);
- assert(new_data <= buffer_end);
- return new_data - buffer; /* real_tuple_size */
-}
-
-static const struct xrow_update_op_meta *
-xrow_update_op_by(char opcode)
-{
- switch (opcode) {
- case '=':
- return &op_set;
- case '+':
- case '-':
- return &op_arith;
- case '&':
- case '|':
- case '^':
- return &op_bit;
- case ':':
- return &op_splice;
- case '#':
- return &op_delete;
- case '!':
- return &op_insert;
- default:
- diag_set(ClientError, ER_UNKNOWN_UPDATE_OP);
- return NULL;
- }
-}
-
-/**
- * Decode an update operation from MessagePack.
- * @param[out] op Update operation.
- * @param index_base Field numbers base: 0 or 1.
- * @param dict Dictionary to lookup field number by a name.
- * @param expr MessagePack.
- *
- * @retval 0 Success.
- * @retval -1 Client error.
- */
-static inline int
-xrow_update_op_decode(struct xrow_update_op *op, int index_base,
- struct tuple_dictionary *dict, const char **expr)
-{
- if (mp_typeof(**expr) != MP_ARRAY) {
- diag_set(ClientError, ER_ILLEGAL_PARAMS, "update operation "
- "must be an array {op,..}");
- return -1;
- }
- uint32_t len, args = mp_decode_array(expr);
- if (args < 1) {
- diag_set(ClientError, ER_ILLEGAL_PARAMS, "update operation "\
- "must be an array {op,..}, got empty array");
- return -1;
- }
- if (mp_typeof(**expr) != MP_STR) {
- diag_set(ClientError, ER_ILLEGAL_PARAMS,
- "update operation name must be a string");
- return -1;
- }
- op->opcode = *mp_decode_str(expr, &len);
- op->meta = xrow_update_op_by(op->opcode);
- if (op->meta == NULL)
- return -1;
- if (args != op->meta->args) {
- diag_set(ClientError, ER_UNKNOWN_UPDATE_OP);
- return -1;
- }
- int32_t field_no = 0;
- switch(mp_typeof(**expr)) {
- case MP_INT:
- case MP_UINT: {
- if (xrow_update_mp_read_int32(op, expr, &field_no) != 0)
- return -1;
- if (field_no - index_base >= 0) {
- op->field_no = field_no - index_base;
- } else if (field_no < 0) {
- op->field_no = field_no;
- } else {
- diag_set(ClientError, ER_NO_SUCH_FIELD_NO, field_no);
- return -1;
- }
- break;
- }
- case MP_STR: {
- const char *path = mp_decode_str(expr, &len);
- uint32_t field_no, hash = field_name_hash(path, len);
- if (tuple_fieldno_by_name(dict, path, len, hash,
- &field_no) != 0) {
- diag_set(ClientError, ER_NO_SUCH_FIELD_NAME,
- tt_cstr(path, len));
- return -1;
- }
- op->field_no = (int32_t) field_no;
- break;
- }
- default:
- diag_set(ClientError, ER_ILLEGAL_PARAMS,
- "field id must be a number or a string");
- return -1;
- }
- return op->meta->read_arg(index_base, op, expr);
-}
-
/**
* Read and check update operations and fill column mask.
*
@@ -1285,14 +269,13 @@ static int
xrow_update_do_ops(struct xrow_update *update, const char *old_data,
const char *old_data_end, uint32_t part_count)
{
- update->rope = xrow_update_rope_new_by_tuple(old_data, old_data_end,
- part_count);
- if (update->rope == NULL)
+ if (xrow_update_array_create(&update->root, old_data, old_data_end,
+ part_count) != 0)
return -1;
struct xrow_update_op *op = update->ops;
struct xrow_update_op *ops_end = op + update->op_count;
for (; op < ops_end; op++) {
- if (op->meta->do_op(update, op))
+ if (op->meta->do_op(op, &update->root) != 0)
return -1;
}
return 0;
@@ -1308,14 +291,13 @@ xrow_upsert_do_ops(struct xrow_update *update, const char *old_data,
const char *old_data_end, uint32_t part_count,
bool suppress_error)
{
- update->rope = xrow_update_rope_new_by_tuple(old_data, old_data_end,
- part_count);
- if (update->rope == NULL)
+ if (xrow_update_array_create(&update->root, old_data, old_data_end,
+ part_count) != 0)
return -1;
struct xrow_update_op *op = update->ops;
struct xrow_update_op *ops_end = op + update->op_count;
for (; op < ops_end; op++) {
- if (op->meta->do_op(update, op) == 0)
+ if (op->meta->do_op(op, &update->root) == 0)
continue;
struct error *e = diag_last_error(diag_get());
if (e->type != &type_ClientError)
@@ -1332,24 +314,21 @@ static void
xrow_update_init(struct xrow_update *update, int index_base)
{
memset(update, 0, sizeof(*update));
- /*
- * Base field offset, e.g. 0 for C and 1 for Lua. Used only for
- * error messages. All fields numbers must be zero-based!
- */
update->index_base = index_base;
}
static const char *
xrow_update_finish(struct xrow_update *update, uint32_t *p_tuple_len)
{
- uint32_t tuple_len = xrow_update_calc_tuple_length(update);
+ uint32_t tuple_len = xrow_update_array_sizeof(&update->root);
char *buffer = (char *) region_alloc(&fiber()->gc, tuple_len);
if (buffer == NULL) {
diag_set(OutOfMemory, tuple_len, "region_alloc", "buffer");
return NULL;
}
- *p_tuple_len = xrow_update_write_tuple(update, buffer,
+ *p_tuple_len = xrow_update_array_store(&update->root, buffer,
buffer + tuple_len);
+ assert(*p_tuple_len == tuple_len);
return buffer;
}
@@ -1493,8 +472,9 @@ xrow_upsert_squash(const char *expr1, const char *expr1_end,
op[0]->opcode = '+';
int96_invert(&op[0]->arg.arith.int96);
}
- struct xrow_update_arg_arith res;
- if (xrow_update_arith_make(op[1], op[0]->arg.arith, &res) != 0)
+ struct xrow_update_op res;
+ if (xrow_update_arith_make(op[1], op[0]->arg.arith,
+ &res.arg.arith) != 0)
return NULL;
res_ops = mp_encode_array(res_ops, 3);
res_ops = mp_encode_str(res_ops,
@@ -1503,7 +483,7 @@ xrow_upsert_squash(const char *expr1, const char *expr1_end,
op[0]->field_no +
update[0].index_base);
xrow_update_op_store_arith(&res, NULL, res_ops);
- res_ops += xrow_update_arg_arith_sizeof(res);
+ res_ops += xrow_update_arg_arith_sizeof(&res.arg.arith);
mp_next(&expr[0]);
mp_next(&expr[1]);
op_no[0]++;
diff --git a/src/box/xrow_update/xrow_update_array.c b/src/box/xrow_update/xrow_update_array.c
new file mode 100644
index 000000000..b5f443cd0
--- /dev/null
+++ b/src/box/xrow_update/xrow_update_array.c
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "xrow_update_field.h"
+#include "msgpuck.h"
+#include "fiber.h"
+
+/**
+ * Make field index non-negative and check for the field
+ * existence.
+ */
+static inline int
+xrow_update_op_adjust_field_no(struct xrow_update_op *op, int32_t field_count)
+{
+ if (op->field_no >= 0) {
+ if (op->field_no < field_count)
+ return 0;
+ } else if (op->field_no + field_count >= 0) {
+ op->field_no += field_count;
+ return 0;
+ }
+ return xrow_update_err_no_such_field(op);
+}
+
+/**
+ * Updated array is divided into array items. Each item is a range
+ * of fields. Item updates first field of the range and stores
+ * size of others to save them with no changes into a new tuple
+ * later. It allows on update of a single field in an array store
+ * at most 2 objects - item for the previous fields, and item for
+ * this one + its tail. This is how rope data structure works - a
+ * binary tree designed for big contiguous object updates.
+ */
+struct xrow_update_array_item {
+ /** First field in the range, contains an update. */
+ struct xrow_update_field field;
+ /** Size of other fields in the range. */
+ uint32_t tail_size;
+};
+
+/** Initialize an array item. */
+static inline void
+xrow_update_array_item_create(struct xrow_update_array_item *item,
+ enum xrow_update_type type, const char *data,
+ uint32_t size, uint32_t tail_size)
+{
+ item->field.type = type;
+ item->field.data = data;
+ item->field.size = size;
+ item->tail_size = tail_size;
+}
+
+/** Rope allocator for nodes, paths, items etc. */
+static inline void *
+xrow_update_alloc(struct region *region, size_t size)
+{
+ void *ptr = region_aligned_alloc(region, size, alignof(uint64_t));
+ if (ptr == NULL) {
+ diag_set(OutOfMemory, size, "region_aligned_alloc",
+ "xrow update internals");
+ }
+ return ptr;
+}
+
+/** Split a range of fields in two. */
+static struct xrow_update_array_item *
+xrow_update_array_item_split(struct region *region,
+ struct xrow_update_array_item *prev, size_t size,
+ size_t offset)
+{
+ (void) size;
+ struct xrow_update_array_item *next = (struct xrow_update_array_item *)
+ xrow_update_alloc(region, sizeof(*next));
+ if (next == NULL)
+ return NULL;
+ assert(offset > 0 && prev->tail_size > 0);
+
+ const char *tail = prev->field.data + prev->field.size;
+ const char *field = tail;
+ const char *range_end = tail + prev->tail_size;
+
+ for (uint32_t i = 1; i < offset; ++i)
+ mp_next(&field);
+
+ prev->tail_size = field - tail;
+ const char *field_end = field;
+ mp_next(&field_end);
+ xrow_update_array_item_create(next, XUPDATE_NOP, field,
+ field_end - field, range_end - field_end);
+ return next;
+}
+
+#define ROPE_SPLIT_F xrow_update_array_item_split
+#define ROPE_ALLOC_F xrow_update_alloc
+#define rope_data_t struct xrow_update_array_item *
+#define rope_ctx_t struct region *
+#define rope_name xrow_update
+
+#include "salad/rope.h"
+
+/**
+ * Extract from the array an item whose range starts from the
+ * field affected by @a op.
+ */
+static inline struct xrow_update_array_item *
+xrow_update_array_extract_item(struct xrow_update_field *field,
+ struct xrow_update_op *op)
+{
+ assert(field->type == XUPDATE_ARRAY);
+ struct xrow_update_rope *rope = field->array.rope;
+ uint32_t size = xrow_update_rope_size(rope);
+ if (xrow_update_op_adjust_field_no(op, size) == 0)
+ return xrow_update_rope_extract(rope, op->field_no);
+ return NULL;
+}
+
+int
+xrow_update_array_create(struct xrow_update_field *field, const char *data,
+ const char *data_end, uint32_t field_count)
+{
+ field->type = XUPDATE_ARRAY;
+ field->data = data;
+ field->size = data_end - data;
+ struct region *region = &fiber()->gc;
+ field->array.rope = xrow_update_rope_new(region);
+ if (field->array.rope == NULL)
+ return -1;
+ struct xrow_update_array_item *item = (struct xrow_update_array_item *)
+ xrow_update_alloc(region, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ if (data == data_end)
+ return 0;
+ /*
+ * Initial item consists of one range - the whole array.
+ */
+ const char *begin = data;
+ mp_next(&data);
+ xrow_update_array_item_create(item, XUPDATE_NOP, begin, data - begin,
+ data_end - data);
+ return xrow_update_rope_append(field->array.rope, item, field_count);
+}
+
+uint32_t
+xrow_update_array_sizeof(struct xrow_update_field *field)
+{
+ assert(field->type == XUPDATE_ARRAY);
+ struct xrow_update_rope_iter it;
+ xrow_update_rope_iter_create(&it, field->array.rope);
+
+ uint32_t size = xrow_update_rope_size(field->array.rope);
+ uint32_t res = mp_sizeof_array(size);
+ struct xrow_update_rope_node *node = xrow_update_rope_iter_start(&it);
+ for (; node != NULL; node = xrow_update_rope_iter_next(&it)) {
+ struct xrow_update_array_item *item =
+ xrow_update_rope_leaf_data(node);
+ res += xrow_update_field_sizeof(&item->field) + item->tail_size;
+ }
+ return res;
+}
+
+uint32_t
+xrow_update_array_store(struct xrow_update_field *field, char *out,
+ char *out_end)
+{
+ assert(field->type == XUPDATE_ARRAY);
+ char *out_begin = out;
+ out = mp_encode_array(out, xrow_update_rope_size(field->array.rope));
+ uint32_t total_field_count = 0;
+ struct xrow_update_rope_iter it;
+ xrow_update_rope_iter_create(&it, field->array.rope);
+ struct xrow_update_rope_node *node = xrow_update_rope_iter_start(&it);
+ for (; node != NULL; node = xrow_update_rope_iter_next(&it)) {
+ struct xrow_update_array_item *item =
+ xrow_update_rope_leaf_data(node);
+ uint32_t field_count = xrow_update_rope_leaf_size(node);
+ out += xrow_update_field_store(&item->field, out, out_end);
+ assert(item->tail_size == 0 || field_count > 1);
+ memcpy(out, item->field.data + item->field.size,
+ item->tail_size);
+ out += item->tail_size;
+ total_field_count += field_count;
+ }
+ (void) total_field_count;
+ assert(xrow_update_rope_size(field->array.rope) == total_field_count);
+ assert(out <= out_end);
+ return out - out_begin;
+}
+
+int
+xrow_update_op_do_array_insert(struct xrow_update_op *op,
+ struct xrow_update_field *field)
+{
+ assert(field->type == XUPDATE_ARRAY);
+ struct xrow_update_rope *rope = field->array.rope;
+ uint32_t size = xrow_update_rope_size(rope);
+ if (xrow_update_op_adjust_field_no(op, size + 1) != 0)
+ return -1;
+
+ struct xrow_update_array_item *item = (struct xrow_update_array_item *)
+ xrow_update_alloc(rope->ctx, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ xrow_update_array_item_create(item, XUPDATE_NOP, op->arg.set.value,
+ op->arg.set.length, 0);
+ return xrow_update_rope_insert(rope, op->field_no, item, 1);
+}
+
+int
+xrow_update_op_do_array_set(struct xrow_update_op *op,
+ struct xrow_update_field *field)
+{
+ assert(field->type == XUPDATE_ARRAY);
+ struct xrow_update_rope *rope = field->array.rope;
+ /* Interpret '=' for n + 1 field as insert. */
+ if (op->field_no == (int32_t) xrow_update_rope_size(rope))
+ return xrow_update_op_do_array_insert(op, field);
+
+ struct xrow_update_array_item *item =
+ xrow_update_array_extract_item(field, op);
+ if (item == NULL)
+ return -1;
+ op->new_field_len = op->arg.set.length;
+ /*
+ * Ignore the previous op, if any. It is not correct,
+ * strictly speaking, since only one update is allowed per
+ * field. But it was allowed since the beginning, and
+ * should be supported now for compatibility.
+ */
+ item->field.type = XUPDATE_SCALAR;
+ item->field.scalar.op = op;
+ return 0;
+}
+
+int
+xrow_update_op_do_array_delete(struct xrow_update_op *op,
+ struct xrow_update_field *field)
+{
+ assert(field->type == XUPDATE_ARRAY);
+ struct xrow_update_rope *rope = field->array.rope;
+ uint32_t size = xrow_update_rope_size(rope);
+ if (xrow_update_op_adjust_field_no(op, size) != 0)
+ return -1;
+ uint32_t delete_count = op->arg.del.count;
+ if ((uint64_t) op->field_no + delete_count > size)
+ delete_count = size - op->field_no;
+ assert(delete_count > 0);
+ for (uint32_t u = delete_count; u != 0; --u)
+ xrow_update_rope_erase(rope, op->field_no);
+ return 0;
+}
+
+#define DO_SCALAR_OP_GENERIC(op_type) \
+int \
+xrow_update_op_do_array_##op_type(struct xrow_update_op *op, \
+ struct xrow_update_field *field) \
+{ \
+ struct xrow_update_array_item *item = \
+ xrow_update_array_extract_item(field, op); \
+ if (item == NULL) \
+ return -1; \
+ if (item->field.type != XUPDATE_NOP) \
+ return xrow_update_err_double(op); \
+ if (xrow_update_op_do_##op_type(op, item->field.data) != 0) \
+ return -1; \
+ item->field.type = XUPDATE_SCALAR; \
+ item->field.scalar.op = op; \
+ return 0; \
+}
+
+DO_SCALAR_OP_GENERIC(arith)
+
+DO_SCALAR_OP_GENERIC(bit)
+
+DO_SCALAR_OP_GENERIC(splice)
diff --git a/src/box/xrow_update/xrow_update_field.c b/src/box/xrow_update/xrow_update_field.c
new file mode 100644
index 000000000..ae05f4b0f
--- /dev/null
+++ b/src/box/xrow_update/xrow_update_field.c
@@ -0,0 +1,665 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "xrow_update_field.h"
+#include "box/tuple_format.h"
+#include "mp_extension_types.h"
+
+/* {{{ Error helpers. */
+
+/** Take a string identifier of a field being updated by @a op. */
+static inline const char *
+xrow_update_op_field_str(const struct xrow_update_op *op)
+{
+ if (op->field_no >= 0)
+ return tt_sprintf("%d", op->field_no + TUPLE_INDEX_BASE);
+ else
+ return tt_sprintf("%d", op->field_no);
+}
+
+static inline int
+xrow_update_err_arg_type(const struct xrow_update_op *op,
+ const char *needed_type)
+{
+ diag_set(ClientError, ER_UPDATE_ARG_TYPE, op->opcode,
+ xrow_update_op_field_str(op), needed_type);
+ return -1;
+}
+
+static inline int
+xrow_update_err_int_overflow(const struct xrow_update_op *op)
+{
+ diag_set(ClientError, ER_UPDATE_INTEGER_OVERFLOW, op->opcode,
+ xrow_update_op_field_str(op));
+ return -1;
+}
+
+static inline int
+xrow_update_err_decimal_overflow(const struct xrow_update_op *op)
+{
+ diag_set(ClientError, ER_UPDATE_DECIMAL_OVERFLOW, op->opcode,
+ xrow_update_op_field_str(op));
+ return -1;
+}
+
+static inline int
+xrow_update_err_splice_bound(const struct xrow_update_op *op)
+{
+ diag_set(ClientError, ER_UPDATE_SPLICE, xrow_update_op_field_str(op),
+ "offset is out of bound");
+ return -1;
+}
+
+int
+xrow_update_err_no_such_field(const struct xrow_update_op *op)
+{
+ diag_set(ClientError, ER_NO_SUCH_FIELD_NO, op->field_no >= 0 ?
+ TUPLE_INDEX_BASE + op->field_no : op->field_no);
+ return -1;
+}
+
+int
+xrow_update_err(const struct xrow_update_op *op, const char *reason)
+{
+ diag_set(ClientError, ER_UPDATE_FIELD, xrow_update_op_field_str(op),
+ reason);
+ return -1;
+}
+
+/* }}} Error helpers. */
+
+uint32_t
+xrow_update_field_sizeof(struct xrow_update_field *field)
+{
+ switch (field->type) {
+ case XUPDATE_NOP:
+ return field->size;
+ case XUPDATE_SCALAR:
+ return field->scalar.op->new_field_len;
+ case XUPDATE_ARRAY:
+ return xrow_update_array_sizeof(field);
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
+uint32_t
+xrow_update_field_store(struct xrow_update_field *field, char *out,
+ char *out_end)
+{
+ struct xrow_update_op *op;
+ uint32_t size;
+ switch(field->type) {
+ case XUPDATE_NOP:
+ assert(out_end - out >= field->size);
+ memcpy(out, field->data, field->size);
+ return field->size;
+ case XUPDATE_SCALAR:
+ op = field->scalar.op;
+ size = op->new_field_len;
+ assert(out_end - out >= size);
+ op->meta->store(op, field->data, out);
+ return size;
+ case XUPDATE_ARRAY:
+ return xrow_update_array_store(field, out, out_end);
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
+/* {{{ read_arg helpers. */
+
+static inline int
+xrow_update_mp_read_int32(struct xrow_update_op *op, const char **expr,
+ int32_t *ret)
+{
+ if (mp_read_int32(expr, ret) == 0)
+ return 0;
+ return xrow_update_err_arg_type(op, "an integer");
+}
+
+static inline int
+xrow_update_mp_read_uint(struct xrow_update_op *op, const char **expr,
+ uint64_t *ret)
+{
+ if (mp_typeof(**expr) == MP_UINT) {
+ *ret = mp_decode_uint(expr);
+ return 0;
+ }
+ return xrow_update_err_arg_type(op, "a positive integer");
+}
+
+static inline int
+xrow_mp_read_arg_arith(struct xrow_update_op *op, const char **expr,
+ struct xrow_update_arg_arith *ret)
+{
+ int8_t ext_type;
+ uint32_t len;
+ switch(mp_typeof(**expr)) {
+ case MP_UINT:
+ ret->type = XUPDATE_TYPE_INT;
+ int96_set_unsigned(&ret->int96, mp_decode_uint(expr));
+ return 0;
+ case MP_INT:
+ ret->type = XUPDATE_TYPE_INT;
+ int96_set_signed(&ret->int96, mp_decode_int(expr));
+ return 0;
+ case MP_DOUBLE:
+ ret->type = XUPDATE_TYPE_DOUBLE;
+ ret->dbl = mp_decode_double(expr);
+ return 0;
+ case MP_FLOAT:
+ ret->type = XUPDATE_TYPE_FLOAT;
+ ret->flt = mp_decode_float(expr);
+ return 0;
+ case MP_EXT:
+ len = mp_decode_extl(expr, &ext_type);
+ if (ext_type == MP_DECIMAL) {
+ ret->type = XUPDATE_TYPE_DECIMAL;
+ decimal_unpack(expr, len, &ret->dec);
+ return 0;
+ }
+ FALLTHROUGH;
+ default:
+ return xrow_update_err_arg_type(op, "a number");
+ }
+}
+
+static inline int
+xrow_update_mp_read_str(struct xrow_update_op *op, const char **expr,
+ uint32_t *len, const char **ret)
+{
+ if (mp_typeof(**expr) == MP_STR) {
+ *ret = mp_decode_str(expr, len);
+ return 0;
+ }
+ return xrow_update_err_arg_type(op, "a string");
+}
+
+/* }}} read_arg helpers. */
+
+/* {{{ read_arg */
+
+static int
+xrow_update_read_arg_set(struct xrow_update_op *op, const char **expr,
+ int index_base)
+{
+ (void) index_base;
+ op->arg.set.value = *expr;
+ mp_next(expr);
+ op->arg.set.length = (uint32_t) (*expr - op->arg.set.value);
+ return 0;
+}
+
+static int
+xrow_update_read_arg_delete(struct xrow_update_op *op, const char **expr,
+ int index_base)
+{
+ (void) index_base;
+ if (mp_typeof(**expr) == MP_UINT) {
+ op->arg.del.count = (uint32_t) mp_decode_uint(expr);
+ if (op->arg.del.count != 0)
+ return 0;
+ return xrow_update_err(op, "cannot delete 0 fields");
+ }
+ return xrow_update_err_arg_type(op, "a positive integer");
+}
+
+static int
+xrow_update_read_arg_arith(struct xrow_update_op *op, const char **expr,
+ int index_base)
+{
+ (void) index_base;
+ return xrow_mp_read_arg_arith(op, expr, &op->arg.arith);
+}
+
+static int
+xrow_update_read_arg_bit(struct xrow_update_op *op, const char **expr,
+ int index_base)
+{
+ (void) index_base;
+ return xrow_update_mp_read_uint(op, expr, &op->arg.bit.val);
+}
+
+static int
+xrow_update_read_arg_splice(struct xrow_update_op *op, const char **expr,
+ int index_base)
+{
+ struct xrow_update_arg_splice *arg = &op->arg.splice;
+ if (xrow_update_mp_read_int32(op, expr, &arg->offset))
+ return -1;
+ if (arg->offset >= 0) {
+ if (arg->offset - index_base < 0)
+ return xrow_update_err_splice_bound(op);
+ arg->offset -= index_base;
+ }
+ if (xrow_update_mp_read_int32(op, expr, &arg->cut_length) != 0)
+ return -1;
+ return xrow_update_mp_read_str(op, expr, &arg->paste_length,
+ &arg->paste);
+}
+
+/* }}} read_arg */
+
+/* {{{ do_op helpers. */
+
+static inline double
+xrow_update_arg_arith_to_double(struct xrow_update_arg_arith arg)
+{
+ if (arg.type == XUPDATE_TYPE_DOUBLE) {
+ return arg.dbl;
+ } else if (arg.type == XUPDATE_TYPE_FLOAT) {
+ return arg.flt;
+ } else {
+ assert(arg.type == XUPDATE_TYPE_INT);
+ if (int96_is_uint64(&arg.int96)) {
+ return int96_extract_uint64(&arg.int96);
+ } else {
+ assert(int96_is_neg_int64(&arg.int96));
+ return int96_extract_neg_int64(&arg.int96);
+ }
+ }
+}
+
+static inline decimal_t *
+xrow_update_arg_arith_to_decimal(struct xrow_update_arg_arith arg,
+ decimal_t *dec)
+{
+ if (arg.type == XUPDATE_TYPE_DECIMAL) {
+ *dec = arg.dec;
+ return dec;
+ } else if (arg.type == XUPDATE_TYPE_DOUBLE) {
+ return decimal_from_double(dec, arg.dbl);
+ } else if (arg.type == XUPDATE_TYPE_FLOAT) {
+ return decimal_from_double(dec, arg.flt);
+ } else {
+ assert(arg.type == XUPDATE_TYPE_INT);
+ if (int96_is_uint64(&arg.int96)) {
+ uint64_t val = int96_extract_uint64(&arg.int96);
+ return decimal_from_uint64(dec, val);
+ } else {
+ assert(int96_is_neg_int64(&arg.int96));
+ int64_t val = int96_extract_neg_int64(&arg.int96);
+ return decimal_from_int64(dec, val);
+ }
+ }
+}
+
+uint32_t
+xrow_update_arg_arith_sizeof(const struct xrow_update_arg_arith *arg)
+{
+ switch (arg->type) {
+ case XUPDATE_TYPE_INT:
+ if (int96_is_uint64(&arg->int96)) {
+ uint64_t val = int96_extract_uint64(&arg->int96);
+ return mp_sizeof_uint(val);
+ } else {
+ int64_t val = int96_extract_neg_int64(&arg->int96);
+ return mp_sizeof_int(val);
+ }
+ break;
+ case XUPDATE_TYPE_DOUBLE:
+ return mp_sizeof_double(arg->dbl);
+ case XUPDATE_TYPE_FLOAT:
+ return mp_sizeof_float(arg->flt);
+ default:
+ assert(arg->type == XUPDATE_TYPE_DECIMAL);
+ return mp_sizeof_decimal(&arg->dec);
+ }
+}
+
+int
+xrow_update_arith_make(struct xrow_update_op *op,
+ struct xrow_update_arg_arith arg,
+ struct xrow_update_arg_arith *ret)
+{
+ struct xrow_update_arg_arith arg1 = arg;
+ struct xrow_update_arg_arith arg2 = op->arg.arith;
+ enum xrow_update_arith_type lowest_type = arg1.type;
+ char opcode = op->opcode;
+ if (arg1.type > arg2.type)
+ lowest_type = arg2.type;
+
+ if (lowest_type == XUPDATE_TYPE_INT) {
+ switch(opcode) {
+ case '+':
+ int96_add(&arg1.int96, &arg2.int96);
+ break;
+ case '-':
+ int96_invert(&arg2.int96);
+ int96_add(&arg1.int96, &arg2.int96);
+ break;
+ default:
+ unreachable();
+ break;
+ }
+ if (!int96_is_uint64(&arg1.int96) &&
+ !int96_is_neg_int64(&arg1.int96))
+ return xrow_update_err_int_overflow(op);
+ *ret = arg1;
+ } else if (lowest_type >= XUPDATE_TYPE_DOUBLE) {
+ double a = xrow_update_arg_arith_to_double(arg1);
+ double b = xrow_update_arg_arith_to_double(arg2);
+ double c;
+ switch(opcode) {
+ case '+':
+ c = a + b;
+ break;
+ case '-':
+ c = a - b;
+ break;
+ default:
+ unreachable();
+ break;
+ }
+ if (lowest_type == XUPDATE_TYPE_DOUBLE) {
+ ret->type = XUPDATE_TYPE_DOUBLE;
+ ret->dbl = c;
+ } else {
+ assert(lowest_type == XUPDATE_TYPE_FLOAT);
+ ret->type = XUPDATE_TYPE_FLOAT;
+ ret->flt = (float)c;
+ }
+ } else {
+ decimal_t a, b, c;
+ if (! xrow_update_arg_arith_to_decimal(arg1, &a) ||
+ ! xrow_update_arg_arith_to_decimal(arg2, &b)) {
+ return xrow_update_err_arg_type(op, "a number "\
+ "convertible to "\
+ "decimal");
+ }
+ switch(opcode) {
+ case '+':
+ if (decimal_add(&c, &a, &b) == NULL)
+ return xrow_update_err_decimal_overflow(op);
+ break;
+ case '-':
+ if (decimal_sub(&c, &a, &b) == NULL)
+ return xrow_update_err_decimal_overflow(op);
+ break;
+ default:
+ unreachable();
+ break;
+ }
+ ret->type = XUPDATE_TYPE_DECIMAL;
+ ret->dec = c;
+ }
+ return 0;
+}
+
+int
+xrow_update_op_do_arith(struct xrow_update_op *op, const char *old)
+{
+ struct xrow_update_arg_arith left_arg;
+ if (xrow_mp_read_arg_arith(op, &old, &left_arg) != 0 ||
+ xrow_update_arith_make(op, left_arg, &op->arg.arith) != 0)
+ return -1;
+ op->new_field_len = xrow_update_arg_arith_sizeof(&op->arg.arith);
+ return 0;
+}
+
+int
+xrow_update_op_do_bit(struct xrow_update_op *op, const char *old)
+{
+ uint64_t val = 0;
+ if (xrow_update_mp_read_uint(op, &old, &val) != 0)
+ return -1;
+ struct xrow_update_arg_bit *arg = &op->arg.bit;
+ switch (op->opcode) {
+ case '&':
+ arg->val &= val;
+ break;
+ case '^':
+ arg->val ^= val;
+ break;
+ case '|':
+ arg->val |= val;
+ break;
+ default:
+ unreachable();
+ }
+ op->new_field_len = mp_sizeof_uint(arg->val);
+ return 0;
+}
+
+int
+xrow_update_op_do_splice(struct xrow_update_op *op, const char *old)
+{
+ struct xrow_update_arg_splice *arg = &op->arg.splice;
+ int32_t str_len = 0;
+ if (xrow_update_mp_read_str(op, &old, (uint32_t *) &str_len, &old) != 0)
+ return -1;
+
+ if (arg->offset < 0) {
+ if (-arg->offset > str_len + 1)
+ return xrow_update_err_splice_bound(op);
+ arg->offset += str_len + 1;
+ } else if (arg->offset > str_len) {
+ arg->offset = str_len;
+ }
+ assert(arg->offset >= 0 && arg->offset <= str_len);
+ if (arg->cut_length < 0) {
+ if (-arg->cut_length > (str_len - arg->offset))
+ arg->cut_length = 0;
+ else
+ arg->cut_length += str_len - arg->offset;
+ } else if (arg->cut_length > str_len - arg->offset) {
+ arg->cut_length = str_len - arg->offset;
+ }
+ assert(arg->offset <= str_len);
+
+ arg->tail_offset = arg->offset + arg->cut_length;
+ arg->tail_length = str_len - arg->tail_offset;
+ op->new_field_len = mp_sizeof_str(arg->offset + arg->paste_length +
+ arg->tail_length);
+ return 0;
+}
+
+/* }}} do_op helpers. */
+
+/* {{{ store_op */
+
+static void
+xrow_update_op_store_set(struct xrow_update_op *op, const char *in, char *out)
+{
+ (void) in;
+ memcpy(out, op->arg.set.value, op->arg.set.length);
+}
+
+void
+xrow_update_op_store_arith(struct xrow_update_op *op, const char *in, char *out)
+{
+ (void) in;
+ struct xrow_update_arg_arith *arg = &op->arg.arith;
+ switch (arg->type) {
+ case XUPDATE_TYPE_INT:
+ if (int96_is_uint64(&arg->int96)) {
+ mp_encode_uint(out, int96_extract_uint64(&arg->int96));
+ } else {
+ assert(int96_is_neg_int64(&arg->int96));
+ mp_encode_int(out, int96_extract_neg_int64(&arg->int96));
+ }
+ break;
+ case XUPDATE_TYPE_DOUBLE:
+ mp_encode_double(out, arg->dbl);
+ break;
+ case XUPDATE_TYPE_FLOAT:
+ mp_encode_float(out, arg->flt);
+ break;
+ default:
+ assert(arg->type == XUPDATE_TYPE_DECIMAL);
+ mp_encode_decimal(out, &arg->dec);
+ break;
+ }
+}
+
+static void
+xrow_update_op_store_bit(struct xrow_update_op *op, const char *in, char *out)
+{
+ (void) in;
+ mp_encode_uint(out, op->arg.bit.val);
+}
+
+static void
+xrow_update_op_store_splice(struct xrow_update_op *op, const char *in,
+ char *out)
+{
+ struct xrow_update_arg_splice *arg = &op->arg.splice;
+ uint32_t new_str_len = arg->offset + arg->paste_length +
+ arg->tail_length;
+ (void) mp_decode_strl(&in);
+ out = mp_encode_strl(out, new_str_len);
+ /* Copy field head. */
+ memcpy(out, in, arg->offset);
+ out = out + arg->offset;
+ /* Copy the paste. */
+ memcpy(out, arg->paste, arg->paste_length);
+ out = out + arg->paste_length;
+ /* Copy tail. */
+ memcpy(out, in + arg->tail_offset, arg->tail_length);
+}
+
+/* }}} store_op */
+
+static const struct xrow_update_op_meta op_set = {
+ xrow_update_read_arg_set, xrow_update_op_do_field_set,
+ (xrow_update_op_store_f) xrow_update_op_store_set, 3
+};
+static const struct xrow_update_op_meta op_insert = {
+ xrow_update_read_arg_set, xrow_update_op_do_field_insert,
+ (xrow_update_op_store_f) xrow_update_op_store_set, 3
+};
+static const struct xrow_update_op_meta op_arith = {
+ xrow_update_read_arg_arith, xrow_update_op_do_field_arith,
+ (xrow_update_op_store_f) xrow_update_op_store_arith, 3
+};
+static const struct xrow_update_op_meta op_bit = {
+ xrow_update_read_arg_bit, xrow_update_op_do_field_bit,
+ (xrow_update_op_store_f) xrow_update_op_store_bit, 3
+};
+static const struct xrow_update_op_meta op_splice = {
+ xrow_update_read_arg_splice, xrow_update_op_do_field_splice,
+ (xrow_update_op_store_f) xrow_update_op_store_splice, 5
+};
+static const struct xrow_update_op_meta op_delete = {
+ xrow_update_read_arg_delete, xrow_update_op_do_field_delete,
+ (xrow_update_op_store_f) NULL, 3
+};
+
+static inline const struct xrow_update_op_meta *
+xrow_update_op_by(char opcode)
+{
+ switch (opcode) {
+ case '=':
+ return &op_set;
+ case '+':
+ case '-':
+ return &op_arith;
+ case '&':
+ case '|':
+ case '^':
+ return &op_bit;
+ case ':':
+ return &op_splice;
+ case '#':
+ return &op_delete;
+ case '!':
+ return &op_insert;
+ default:
+ diag_set(ClientError, ER_UNKNOWN_UPDATE_OP);
+ return NULL;
+ }
+}
+
+int
+xrow_update_op_decode(struct xrow_update_op *op, int index_base,
+ struct tuple_dictionary *dict, const char **expr)
+{
+ if (mp_typeof(**expr) != MP_ARRAY) {
+ diag_set(ClientError, ER_ILLEGAL_PARAMS, "update operation "
+ "must be an array {op,..}");
+ return -1;
+ }
+ uint32_t len, arg_count = mp_decode_array(expr);
+ if (arg_count < 1) {
+ diag_set(ClientError, ER_ILLEGAL_PARAMS, "update operation "\
+ "must be an array {op,..}, got empty array");
+ return -1;
+ }
+ if (mp_typeof(**expr) != MP_STR) {
+ diag_set(ClientError, ER_ILLEGAL_PARAMS,
+ "update operation name must be a string");
+ return -1;
+ }
+ op->opcode = *mp_decode_str(expr, &len);
+ op->meta = xrow_update_op_by(op->opcode);
+ if (op->meta == NULL)
+ return -1;
+ if (arg_count != op->meta->arg_count) {
+ diag_set(ClientError, ER_UNKNOWN_UPDATE_OP);
+ return -1;
+ }
+ int32_t field_no = 0;
+ switch(mp_typeof(**expr)) {
+ case MP_INT:
+ case MP_UINT: {
+ if (xrow_update_mp_read_int32(op, expr, &field_no) != 0)
+ return -1;
+ if (field_no - index_base >= 0) {
+ op->field_no = field_no - index_base;
+ } else if (field_no < 0) {
+ op->field_no = field_no;
+ } else {
+ diag_set(ClientError, ER_NO_SUCH_FIELD_NO, field_no);
+ return -1;
+ }
+ break;
+ }
+ case MP_STR: {
+ const char *path = mp_decode_str(expr, &len);
+ uint32_t field_no, hash = field_name_hash(path, len);
+ if (tuple_fieldno_by_name(dict, path, len, hash,
+ &field_no) == 0) {
+ op->field_no = (int32_t) field_no;
+ break;
+ }
+ diag_set(ClientError, ER_NO_SUCH_FIELD_NAME,
+ tt_cstr(path, len));
+ return -1;
+ }
+ default:
+ diag_set(ClientError, ER_ILLEGAL_PARAMS,
+ "field id must be a number or a string");
+ return -1;
+ }
+ return op->meta->read_arg(op, expr, index_base);
+}
diff --git a/src/box/xrow_update/xrow_update_field.h b/src/box/xrow_update/xrow_update_field.h
new file mode 100644
index 000000000..4dfa90952
--- /dev/null
+++ b/src/box/xrow_update/xrow_update_field.h
@@ -0,0 +1,442 @@
+#ifndef TARANTOOL_BOX_TUPLE_UPDATE_FIELD_H
+#define TARANTOOL_BOX_TUPLE_UPDATE_FIELD_H
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "trivia/util.h"
+#include "tt_static.h"
+#include <stdint.h>
+#include "bit/int96.h"
+#include "mp_decimal.h"
+
+/**
+ * This file is a link between all the update operations for all
+ * the field types. It functions like a router, when an update
+ * operation is being parsed step by step, going down the update
+ * tree. For example, when an update operation goes through an
+ * array, a map, another array, and ends with a scalar operation,
+ * at the end of each step the operation goes to the next one via
+ * functions of this file: xrow_update_array.c ->
+ * xrow_update_field.c -> xrow_update_map.c ->
+ * xrow_update_field.c -> xrow_update_array.c -> ... . The
+ * functions, doing the routing, are
+ * xrow_update_op_do_field_<operation_type>(), see them below.
+ */
+
+struct xrow_update_rope;
+struct xrow_update_field;
+struct xrow_update_op;
+struct tuple_dictionary;
+
+/* {{{ xrow_update_op */
+
+/** Argument of SET (and INSERT) operation. */
+struct xrow_update_arg_set {
+ uint32_t length;
+ const char *value;
+};
+
+/** Argument of DELETE operation. */
+struct xrow_update_arg_del {
+ uint32_t count;
+};
+
+/**
+ * MsgPack format code of an arithmetic argument or result.
+ * MsgPack codes are not used to simplify type calculation.
+ */
+enum xrow_update_arith_type {
+ XUPDATE_TYPE_DECIMAL = 0, /* MP_EXT + MP_DECIMAL */
+ XUPDATE_TYPE_DOUBLE = 1, /* MP_DOUBLE */
+ XUPDATE_TYPE_FLOAT = 2, /* MP_FLOAT */
+ XUPDATE_TYPE_INT = 3 /* MP_INT/MP_UINT */
+};
+
+/**
+ * Argument (left and right) and result of ADD, SUBTRACT.
+ *
+ * To perform an arithmetic operation, update first loads left
+ * and right arguments into corresponding value objects, then
+ * performs arithmetics on types of arguments, thus calculating
+ * the type of the result, and then performs the requested
+ * operation according to the calculated type rules.
+ *
+ * The rules are as follows:
+ * - when one of the argument types is double, the result is
+ * double;
+ * - when one of the argument types is float, the result is float;
+ * - when one of the arguments is a decimal, the result is decimal
+ * too;
+ * - for integer arguments, the result type code depends on the
+ * range in which falls the result of the operation. If the
+ * result is in negative range, it's MP_INT, otherwise it's
+ * MP_UINT. If the result is out of bounds of (-2^63, 2^64), an
+ * exception is raised for overflow.
+ */
+struct xrow_update_arg_arith {
+ enum xrow_update_arith_type type;
+ union {
+ double dbl;
+ float flt;
+ struct int96_num int96;
+ decimal_t dec;
+ };
+};
+
+/** Argument of AND, XOR, OR operations. */
+struct xrow_update_arg_bit {
+ uint64_t val;
+};
+
+/** Argument of SPLICE. */
+struct xrow_update_arg_splice {
+ /** Splice position. */
+ int32_t offset;
+ /** Byte count to delete. */
+ int32_t cut_length;
+ /** New content. */
+ const char *paste;
+ /** New content length. */
+ uint32_t paste_length;
+
+ /** Offset of the tail in the old field. */
+ int32_t tail_offset;
+ /** Size of the tail. */
+ int32_t tail_length;
+};
+
+/** Update operation argument. */
+union xrow_update_arg {
+ struct xrow_update_arg_set set;
+ struct xrow_update_arg_del del;
+ struct xrow_update_arg_arith arith;
+ struct xrow_update_arg_bit bit;
+ struct xrow_update_arg_splice splice;
+};
+
+typedef int
+(*xrow_update_op_read_arg_f)(struct xrow_update_op *op, const char **expr,
+ int index_base);
+
+typedef int
+(*xrow_update_op_do_f)(struct xrow_update_op *op,
+ struct xrow_update_field *field);
+
+typedef void
+(*xrow_update_op_store_f)(struct xrow_update_op *op, const char *in, char *out);
+
+/**
+ * A set of functions and properties to initialize, do and store
+ * an operation.
+ */
+struct xrow_update_op_meta {
+ /**
+ * Virtual function to read the arguments of the
+ * operation.
+ */
+ xrow_update_op_read_arg_f read_arg;
+ /** Virtual function to execute the operation. */
+ xrow_update_op_do_f do_op;
+ /**
+ * Virtual function to store a result of the operation.
+ */
+ xrow_update_op_store_f store;
+ /** Argument count. */
+ uint32_t arg_count;
+};
+
+/** A single UPDATE operation. */
+struct xrow_update_op {
+ /** Operation meta depending on the op type. */
+ const struct xrow_update_op_meta *meta;
+ /** Operation arguments. */
+ union xrow_update_arg arg;
+ /** First level field no. */
+ int32_t field_no;
+ /** Size of a new field after it is updated. */
+ uint32_t new_field_len;
+ /** Opcode symbol: = + - / ... */
+ char opcode;
+};
+
+/**
+ * Decode an update operation from MessagePack.
+ * @param[out] op Update operation.
+ * @param index_base Field numbers base: 0 or 1.
+ * @param dict Dictionary to lookup field number by a name.
+ * @param expr MessagePack.
+ *
+ * @retval 0 Success.
+ * @retval -1 Client error.
+ */
+int
+xrow_update_op_decode(struct xrow_update_op *op, int index_base,
+ struct tuple_dictionary *dict, const char **expr);
+
+/* }}} xrow_update_op */
+
+/* {{{ xrow_update_field */
+
+/** Types of field update. */
+enum xrow_update_type {
+ /**
+ * Field is not updated. Just save it as is. It is used,
+ * for example, when a rope is split in two parts: not
+ * changed left range of fields, and a right range with
+ * its first field changed. In this case the left range is
+ * NOP.
+ */
+ XUPDATE_NOP,
+ /**
+ * Field is a scalar value, updated via set, arith, bit,
+ * splice, or any other scalar-applicable operation.
+ */
+ XUPDATE_SCALAR,
+ /**
+ * Field is an updated array. Check the rope for updates
+ * of individual fields.
+ */
+ XUPDATE_ARRAY,
+};
+
+/**
+ * Generic structure describing update of a field. It can be
+ * tuple field, field of a tuple field, or any another tuple
+ * internal object: map, array, scalar, or unchanged field of any
+ * type without op. This is a node of an update field tree.
+ */
+struct xrow_update_field {
+ /**
+ * Type of this field's update. Union below depends on it.
+ */
+ enum xrow_update_type type;
+ /** Field data to update. */
+ const char *data;
+ uint32_t size;
+ union {
+ /**
+ * This update is terminal. It does a scalar
+ * operation and has no children fields.
+ */
+ struct {
+ struct xrow_update_op *op;
+ } scalar;
+ /**
+ * This update changes an array. Children fields
+ * are stored in rope nodes.
+ */
+ struct {
+ struct xrow_update_rope *rope;
+ } array;
+ };
+};
+
+/**
+ * Update_field has a generic API and a typed API. The generic API
+ * takes fields of any type. These are:
+ *
+ * xrow_update_field_sizeof
+ * xrow_update_field_store
+ * xrow_update_op_do_field_<operation>
+ *
+ * Typed API is used when type of an update_field is known in
+ * code, and by generic API internally. These are
+ *
+ * xrow_update_<type>_sizeof
+ * xrow_update_<type>_store
+ * xrow_update_op_do_<type>_<operation>
+ *
+ * Sizeof calculates size of the whole subtree of a given
+ * update_field. Store saves the whole subtree. Operation
+ * executors apply an operation to one of the nodes in the
+ * subtree. They may change the update tree structure.
+ */
+
+/**
+ * Size of the updated field, including all children recursively.
+ */
+uint32_t
+xrow_update_field_sizeof(struct xrow_update_field *field);
+
+/** Save the updated field, including all children recursively. */
+uint32_t
+xrow_update_field_store(struct xrow_update_field *field, char *out,
+ char *out_end);
+
+/**
+ * Generate declarations for a concrete field type: array, bar
+ * etc. Each complex type has basic operations of the same
+ * signature: insert, set, delete, arith, bit, splice.
+ */
+#define OP_DECL_GENERIC(type) \
+int \
+xrow_update_op_do_##type##_insert(struct xrow_update_op *op, \
+ struct xrow_update_field *field); \
+ \
+int \
+xrow_update_op_do_##type##_set(struct xrow_update_op *op, \
+ struct xrow_update_field *field); \
+ \
+int \
+xrow_update_op_do_##type##_delete(struct xrow_update_op *op, \
+ struct xrow_update_field *field); \
+ \
+int \
+xrow_update_op_do_##type##_arith(struct xrow_update_op *op, \
+ struct xrow_update_field *field); \
+ \
+int \
+xrow_update_op_do_##type##_bit(struct xrow_update_op *op, \
+ struct xrow_update_field *field); \
+ \
+int \
+xrow_update_op_do_##type##_splice(struct xrow_update_op *op, \
+ struct xrow_update_field *field); \
+ \
+uint32_t \
+xrow_update_##type##_sizeof(struct xrow_update_field *field); \
+ \
+uint32_t \
+xrow_update_##type##_store(struct xrow_update_field *field, char *out, \
+ char *out_end);
+
+/* }}} xrow_update_field */
+
+/* {{{ xrow_update_field.array */
+
+/**
+ * Initialize @a field as an array to update.
+ * @param[out] field Field to initialize.
+ * @param data MessagePack data of the array to update.
+ * @param data_end End of @a data.
+ * @param field_count Field count in @data.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+xrow_update_array_create(struct xrow_update_field *field, const char *data,
+ const char *data_end, uint32_t field_count);
+
+OP_DECL_GENERIC(array)
+
+/* }}} xrow_update_field.array */
+
+#undef OP_DECL_GENERIC
+
+/* {{{ Common helpers. */
+
+/**
+ * These helper functions are used when a function, updating a
+ * field, doesn't know type of a child node to which wants to
+ * propagate the update.
+ * For example, xrow_update_op_do_array_set calls
+ * xrow_update_op_do_field_set on its childs. Each child can be
+ * another array, a bar, a route, a map - anything. The functions
+ * below help to make such places shorter and simpler.
+ */
+#define OP_DECL_GENERIC(op_type) \
+static inline int \
+xrow_update_op_do_field_##op_type(struct xrow_update_op *op, \
+ struct xrow_update_field *field) \
+{ \
+ switch (field->type) { \
+ case XUPDATE_ARRAY: \
+ return xrow_update_op_do_array_##op_type(op, field); \
+ default: \
+ unreachable(); \
+ } \
+ return 0; \
+}
+
+OP_DECL_GENERIC(insert)
+
+OP_DECL_GENERIC(set)
+
+OP_DECL_GENERIC(delete)
+
+OP_DECL_GENERIC(arith)
+
+OP_DECL_GENERIC(bit)
+
+OP_DECL_GENERIC(splice)
+
+#undef OP_DECL_GENERIC
+
+/* }}} Common helpers. */
+
+/* {{{ Scalar helpers. */
+
+int
+xrow_update_arith_make(struct xrow_update_op *op,
+ struct xrow_update_arg_arith arg,
+ struct xrow_update_arg_arith *ret);
+
+void
+xrow_update_op_store_arith(struct xrow_update_op *op, const char *in,
+ char *out);
+
+uint32_t
+xrow_update_arg_arith_sizeof(const struct xrow_update_arg_arith *arg);
+
+int
+xrow_update_op_do_arith(struct xrow_update_op *op, const char *old);
+
+int
+xrow_update_op_do_bit(struct xrow_update_op *op, const char *old);
+
+int
+xrow_update_op_do_splice(struct xrow_update_op *op, const char *old);
+
+/* }}} Scalar helpers. */
+
+/** {{{ Error helpers. */
+/**
+ * All the error helpers below set diag with appropriate error
+ * code, taking into account field_no < 0, complex paths. They all
+ * return -1 to shorten error returning in a caller function to
+ * single line.
+ */
+
+int
+xrow_update_err_no_such_field(const struct xrow_update_op *op);
+
+int
+xrow_update_err(const struct xrow_update_op *op, const char *reason);
+
+static inline int
+xrow_update_err_double(const struct xrow_update_op *op)
+{
+ return xrow_update_err(op, "double update of the same field");
+}
+
+/** }}} Error helpers. */
+
+#endif /* TARANTOOL_BOX_TUPLE_UPDATE_FIELD_H */
diff --git a/test/box/tuple.result b/test/box/tuple.result
index f7614b4c4..9140211b7 100644
--- a/test/box/tuple.result
+++ b/test/box/tuple.result
@@ -1371,13 +1371,13 @@ d:update{{'-', 1, ffi.new('float', 635)}}
d:update{{'+', 1, 0/0}}
---
- error: 'Argument type in operation ''+'' on field 1 does not match field type: expected
- a number convertible to decimal.'
+ a number convertible to decimal'
...
-- inf
d:update{{'-', 1, 1/0}}
---
- error: 'Argument type in operation ''-'' on field 1 does not match field type: expected
- a number convertible to decimal.'
+ a number convertible to decimal'
...
-- decimal overflow
d = box.tuple.new(dec.new('9e37'))
--
2.21.0 (Apple Git-122.2)
More information about the Tarantool-patches
mailing list