[Tarantool-patches] [PATCH v1 1/1] sql: introduce user-defined aggregate functions

Mergen Imeev imeevma at tarantool.org
Sun Jan 23 17:17:55 MSK 2022


Hi! Thank you for the review! Diff, new patch and a patch that adds a new
utility function below.

On Tue, Jan 18, 2022 at 01:10:04AM +0100, Vladislav Shpilevoy wrote:
> Hi! Thanks for the patch!
> 
> What happened to the idea we had about SQL iterators? Then people would
> be able to iterate with yields and to make any kind of aggregation without
> having to create functions for that.
> 
I think that this idea is actually very similar to the definition of CURSOR from
the standard. Here's what the standard says about CURSORs:
"A cursor is a mechanism by which the rows of a table may be acted on (e.g.,
returned to a host programming language) one at a time."

I will try to suggest this idea.

However, current issue have quite strict deadline, which is actually quite near.
I believe that CURSOR injection is not a problem that can be solved in a few
weeks.

Also, about my current implementation. I don't like it, because there are too
many changes made to BOX that are not related to BOX itself. I think there is a
much simpler implementation. I think you noticed that the current implementation
says that non-constant functions and C functions should define two
implementations named "<name>" and "<name>_finalize". Do you think this rule can
be made more general? I suggest creating two tuples in _func instead of one when
we create an aggregate function using box.schema.func.create(). In this case,
there will be almost no changes in alter.cc and BOX as a whole, and the logic
will become much simpler. Also, I think we won't be adding new opcodes to the
VDBE since we can just reuse OP_FunctionByName. That problem with this approach
is that "<name>_finalize" will be occupied. Also, this could potentially lead to
a different combination of STEP and FINALIZE functions, for example, STEP could
be constant but FINALIZE could be non-persistent. However, to create such
combinations, the user must insert tuples directly into _func. I am not sure,
if this a feature or a bug. What do you think?

> See 8 comments below.
> 
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index 65c1cb952..f1dfcd807 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -3276,6 +3276,85 @@ func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid)
> >  	return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid);
> >  }
> >  
> > +static int
> > +func_def_check_body(struct tuple *tuple)
> > +{
> > +	assert(tuple_field_count(tuple) > BOX_FUNC_FIELD_BODY);
> > +	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
> > +	assert(field != NULL);
> > +	enum mp_type type = mp_typeof(*field);
> > +	if (type == MP_STR)
> > +		return 0;
> > +	if (type != MP_MAP) {
> > +		diag_set(ClientError, ER_FIELD_TYPE, "map or array",
> 
> 1. 'Map or string'.
> 
Fixed.

> > +			 mp_type_strs[type]);
> > +		return -1;
> > +	}
> > +	const char *agg = tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE);
> > +	if (agg == NULL)
> > +		return -1;
> > +	if (STR2ENUM(func_aggregate, agg) != FUNC_AGGREGATE_GROUP) {
> > +		const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
> > +		diag_set(ClientError, ER_CREATE_FUNCTION, name,
> > +			 "only aggregate functions can have map as body");
> > +		return -1;
> > +	}
> > +
> > +	bool has_step = false;
> > +	bool has_finalize = false;
> > +	if (mp_decode_map(&field) != 2)
> > +		goto error;
> > +	for (int i = 0; i < 2; ++i) {
> > +		const char *value;
> > +		uint32_t size;
> > +		if (mp_typeof(*field) != MP_STR)
> > +			goto error;
> > +		value = mp_decode_str(&field, &size);
> > +		if (size == strlen("step")) {
> > +			if (strncmp(value, "step", size) != 0)
> > +				goto error;
> 
> 2. I would rather propose to introduce a function to compare
> 0-terminated string with non-terminated one. It is needed quite
> often apparently. For instance,
> 
> 	strlcmp(const char *l, const char *r, size_t r_len);
> 
I added a new patch. I decided to insert it here instead of sending a new
version, since changes it intoduced are quite small. Here it is:

commit b30a92bdd47d49aed1b82a99dae0f9472e320ee8
Author: Mergen Imeev <imeevma at gmail.com>
Date:   Sun Jan 23 14:58:33 2022 +0300

    util: introduce strlcmp()
    
    Utility function strlcmp() was introduced to simplify comparisons
    between NULL-terminated strings and non-NULL-terminated strings.

diff --git a/src/lib/core/util.c b/src/lib/core/util.c
index f29886105..035e207b1 100644
--- a/src/lib/core/util.c
+++ b/src/lib/core/util.c
@@ -264,6 +264,15 @@ strlcpy(char *dst, const char *src, size_t size)
 }
 #endif
 
+int
+strlcmp(const char *l, const char *r, size_t r_len)
+{
+	size_t l_len = strlen(l);
+	if (l_len != r_len)
+		return l_len - r_len > 0 ? 1 : -1;
+	return strncmp(l, r, r_len);
+}
+
 int
 utf8_check_printable(const char *start, size_t length)
 {
diff --git a/src/trivia/util.h b/src/trivia/util.h
index 3d853f612..ee1b88609 100644
--- a/src/trivia/util.h
+++ b/src/trivia/util.h
@@ -490,6 +490,19 @@ size_t
 strlcpy(char *dst, const char *src, size_t size);
 #endif
 
+/**
+ * Compare two strings, the first of which is NULL-terminated and the second may
+ * not be NULL-terminated.
+ *
+ * @param l the first string, must be NULL-terminated.
+ * @param r the second string.
+ * @param r_len length of the second string.
+ *
+ * @return size of @a src string.
+ */
+int
+strlcmp(const char *l, const char *r, size_t r_len);
+
 /**
  * Check that @a str is valid utf-8 sequence and can be printed
  * unescaped.


> > +			has_step = true;
> > +		} else if (size == strlen("finalize")) {
> > +			if (strncmp(value, "finalize", size) != 0)
> > +				goto error;
> > +			has_finalize = true;
> > +		}
> > +		if (mp_typeof(*field) != MP_STR)
> > +			goto error;
> > +		mp_next(&field);
> > +	}
> > +	if (has_step && has_finalize)
> > +		return 0;
> > +error:
> > +	const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
> > +	diag_set(ClientError, ER_CREATE_FUNCTION, name,
> > +		 "body of aggregate function should be map that contains "
> > +		 "exactly two string fields: 'step' and 'finalize'");
> > +	return -1;
> > +}
> > +
> > +static const char *
> > +func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step)
> 
> 3. Please, lets split that function in 2 maybe. Its invocation with
> hardcoded 'true'/'false' in the end gives hard time understanding what
> these values mean.
> 
Fixed.

> > +{
> > +	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
> > +	assert(field != NULL);
> > +	if (mp_typeof(*field) == MP_STR) {
> > +		if (is_step)
> > +			return mp_decode_str(&field, body_len);
> > +		*body_len = 0;
> > +		return field;
> 
> 4. You return len as 0, but the returned value != NULL? Why?
> 
If I return NULL, that means an error has occurred. However, if I return
something other than NULL and the length is 0, that means a non-persistent
function has been introduced.

> > +	}
> > @@ -3568,6 +3697,16 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
> >  		struct func *func = func_new(def);
> >  		if (func == NULL)
> >  			return -1;
> > +		if (def->aggregate == FUNC_AGGREGATE_GROUP) {
> > +			struct func_def *fin_def =
> > +				func_fin_def_new(new_tuple, def);
> > +			if (fin_def == NULL)
> > +				return -1;
> > +			struct func *func_fin = func_new(fin_def);
> > +			if (func_fin == NULL)
> > +				return -1;
> > +			func_fin_cache_insert(func_fin);
> 
> 5. What happens if WAL write fails and on_create_func_rollback() is called?
> 
I missed it earlier. Do you think it's better to introduce a new field in struct
func/struct func_def or malloc()/free() a data for trigger->data?

> > diff --git a/src/box/schema.h b/src/box/schema.h
> > index d3bbdd590..2f020ee96 100644
> > --- a/src/box/schema.h
> > +++ b/src/box/schema.h
> > @@ -102,6 +102,15 @@ func_by_id(uint32_t fid);
> >  struct func *
> >  func_by_name(const char *name, uint32_t name_len);
> >  
> > +void
> > +func_fin_cache_insert(struct func *func);
> > +
> > +void
> > +func_fin_cache_delete(uint32_t fid);
> > +
> > +struct func *
> > +func_fin_by_id(uint32_t fid);
> 
> 6. Lets leave some comments about what is 'func fin'.
> 
Added a comment to func_fin_cache_insert().

> > diff --git a/src/box/sql/select.c b/src/box/sql/select.c
> > index 2e33a31d2..c77cd1c9f 100644
> > --- a/src/box/sql/select.c
> > +++ b/src/box/sql/select.c
> > @@ -5644,10 +5650,17 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
> >  			pParse->is_aborted = true;
> >  			return;
> >  		}
> > -		sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
> > -		sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
> > -		sql_expr_type_cache_change(pParse, regAgg, nArg);
> > -		sqlReleaseTempRange(pParse, regAgg, nArg);
> > +		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
> > +			sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
> > +			sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
> > +		} else {
> > +			sqlVdbeAddOp3(v, OP_UserAggStep, nArg, regAgg,
> > +				      pF->iMem);
> > +			const char *name = pF->func->def->name;
> > +			int len = pF->func->def->name_len;
> > +			char *str = sqlDbStrNDup(pParse->db, name, len);
> > +			sqlVdbeAppendP4(v, str, P4_DYNAMIC);
> 
> 7. Was it possible to unify how builtin and user-defined functions
> are called? In the same opcode without any special 'if's.
> 
I think something similar might be possible if we used struct func_sql_builtin
somewhere before these lines are called. However, some preparations needed to
be made, for example we have to force the user to re-prepare the statement in
case something changes in _func.

> > diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua
> > new file mode 100755
> > index 000000000..d5b845761
> > --- /dev/null
> > +++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua
> 8. It might also be good to have a test with step/finalize functions
> throwing a Lua error from their body.
Added.


Diff:

diff --git a/src/box/alter.cc b/src/box/alter.cc
index f1dfcd807..5381612af 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3286,7 +3286,7 @@ func_def_check_body(struct tuple *tuple)
 	if (type == MP_STR)
 		return 0;
 	if (type != MP_MAP) {
-		diag_set(ClientError, ER_FIELD_TYPE, "map or array",
+		diag_set(ClientError, ER_FIELD_TYPE, "map or string",
 			 mp_type_strs[type]);
 		return -1;
 	}
@@ -3310,15 +3310,12 @@ func_def_check_body(struct tuple *tuple)
 		if (mp_typeof(*field) != MP_STR)
 			goto error;
 		value = mp_decode_str(&field, &size);
-		if (size == strlen("step")) {
-			if (strncmp(value, "step", size) != 0)
-				goto error;
+		if (strlcmp("step", value, size) == 0)
 			has_step = true;
-		} else if (size == strlen("finalize")) {
-			if (strncmp(value, "finalize", size) != 0)
-				goto error;
+		else if (strlcmp("finalize", value, size) == 0)
 			has_finalize = true;
-		}
+		else
+			goto error;
 		if (mp_typeof(*field) != MP_STR)
 			goto error;
 		mp_next(&field);
@@ -3334,13 +3331,29 @@ error:
 }
 
 static const char *
-func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step)
+func_def_get_agg_step_body(struct tuple *tuple, uint32_t *body_len)
+{
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	if (mp_typeof(*field) == MP_STR)
+		return mp_decode_str(&field, body_len);
+	assert(mp_typeof(*field) == MP_MAP);
+	mp_decode_map(&field);
+	uint32_t size;
+	mp_decode_str(&field, &size);
+	if (size == strlen("step"))
+		return mp_decode_str(&field, body_len);
+	mp_next(&field);
+	mp_next(&field);
+	return mp_decode_str(&field, body_len);
+}
+
+static const char *
+func_def_get_agg_finalize_body(struct tuple *tuple, uint32_t *body_len)
 {
 	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
 	assert(field != NULL);
 	if (mp_typeof(*field) == MP_STR) {
-		if (is_step)
-			return mp_decode_str(&field, body_len);
 		*body_len = 0;
 		return field;
 	}
@@ -3348,7 +3361,7 @@ func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step)
 	mp_decode_map(&field);
 	uint32_t size;
 	mp_decode_str(&field, &size);
-	if (size == (is_step ? strlen("step") : strlen("finalize")))
+	if (size == strlen("finalize"))
 		return mp_decode_str(&field, body_len);
 	mp_next(&field);
 	mp_next(&field);
@@ -3376,7 +3389,7 @@ func_def_new_from_tuple(struct tuple *tuple)
 	if (field_count > BOX_FUNC_FIELD_BODY) {
 		if (func_def_check_body(tuple) != 0)
 			return NULL;
-		body = func_def_get_agg_body(tuple, &body_len, true);
+		body = func_def_get_agg_step_body(tuple, &body_len);
 		if (body == NULL)
 			return NULL;
 		comment = tuple_field_str(tuple, BOX_FUNC_FIELD_COMMENT,
@@ -3628,7 +3641,7 @@ func_fin_def_new(struct tuple *tuple, const struct func_def *def)
 	const char *name = tt_sprintf("%s_finalize", def->name);
 	size_t name_len = strlen(name);
 	uint32_t body_len;
-	const char *body = func_def_get_agg_body(tuple, &body_len, false);
+	const char *body = func_def_get_agg_finalize_body(tuple, &body_len);
 	if (body == NULL)
 		return NULL;
 	if ((body_len == 0 && def->body != NULL) ||
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 873b9f9f4..e2253caa1 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1316,7 +1316,7 @@ case OP_UserAggStep: {
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
 	port_vdbemem_create(&args, (struct sql_value *)argv, pOp->p1 + 1);
-	if (func->vtab->call(func, &args, &ret) != 0) {
+	if (func_call(func, &args, &ret) != 0) {
 		region_truncate(region, region_svp);
 		goto abort_due_to_error;
 	}
@@ -1360,7 +1360,7 @@ case OP_UserAggFinal: {
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
 	port_vdbemem_create(&args, (struct sql_value *)argv, 1);
-	if (func->vtab->call(func, &args, &ret) != 0) {
+	if (func_call(func, &args, &ret) != 0) {
 		region_truncate(region, region_svp);
 		goto abort_due_to_error;
 	}
diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua
index d5b845761..b207257c6 100755
--- a/test/sql-tap/gh-2579-custom-aggregate.test.lua
+++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua
@@ -3,7 +3,7 @@ local build_path = os.getenv("BUILDDIR")
 package.cpath = build_path..'/test/sql-tap/?.so;'..build_path..'/test/sql-tap/?.dylib;'..package.cpath
 
 local test = require("sqltester")
-test:plan(13)
+test:plan(15)
 
 -- Make sure that non-persistent aggregate functions are working as expected.
 local step = function(x, y)
@@ -229,4 +229,50 @@ test:do_test(
         "be map that contains exactly two string fields: 'step' and 'finalize'"
     })
 
+--
+-- Make sure that user-defined aggregate functions that are throwing an error
+-- are working correctly.
+--
+local body_s10 = {
+    step = [[function(x) error("some error") return 0 end]],
+    finalize = [[function(x) return 0 end]]
+}
+
+box.schema.func.create('S10', {aggregate = 'group', returns = 'number',
+                              body = body_s10, exports = {'SQL'}})
+
+test:do_catchsql_test(
+    "gh-2579-14",
+    [[
+        SELECT s10();
+    ]], {
+        1, '[string "return function(x) error("some error") return..."]:1: '..
+           'some error'
+    })
+
+local body_s11 = {
+    step = [[function(x) return 0 end]],
+    finalize = [[function(x) error("another error") return 0 end]]
+}
+
+box.schema.func.create('S11', {aggregate = 'group', returns = 'number',
+                              body = body_s11, exports = {'SQL'}})
+
+test:do_catchsql_test(
+    "gh-2579-15",
+    [[
+        SELECT s11();
+    ]], {
+        1, '[string "return function(x) error("another error") ret..."]:1: '..
+           'another error'
+    })
+
+box.space._func.index[2]:delete("S1")
+box.space._func.index[2]:delete("S2")
+box.space._func.index[2]:delete("gh-2579-custom-aggregate.S3")
+box.space._func.index[2]:delete("S4")
+box.space._func.index[2]:delete("S10")
+box.space._func.index[2]:delete("S11")
+box.space.T:drop()
+
 test:finish_test()
diff --git a/src/box/schema.h b/src/box/schema.h
index 2f020ee96..186fa9db7 100644
--- a/src/box/schema.h
+++ b/src/box/schema.h
@@ -102,6 +102,12 @@ func_by_id(uint32_t fid);
 struct func *
 func_by_name(const char *name, uint32_t name_len);
 
+/**
+ * Insert a new function object for finalize of aggregate function in the
+ * function cache.
+ *
+ * @param func Function object to insert.
+ */
 void
 func_fin_cache_insert(struct func *func);



New patch:

commit 1caf17400eae504a192b6e49cba0d254a4a510ea
Author: Mergen Imeev <imeevma at gmail.com>
Date:   Tue Jan 11 08:48:58 2022 +0300

    sql: introduce user-defined aggregate functions
    
    This patch introduces user-defined SQL aggregate functions and an
    interface for creating them.
    
    Closes #2579
    
    @TarantoolBot document
    Title: User-defined aggregate functions
    
    User-defined aggregate functions are now available. There are three
    types of user-defined aggregate functions: persistent functions,
    non-persistent functions, and C-functions. All user-defined aggregate
    functions consist of two functions: a "step" function, which is executed
    for each record in the group, and a "finalize" function, which is
    executed after all records in the group have been processed.
    
    The "Step" function has one more argument than specified in "param_list"
    in the definition. This argument is the current state of the aggregate
    function and must be the first argument.
    
    The "Finalize" function has exactly one argument, which is the current
    state of the aggregate function.
    
    To use a C user-defined aggregate function and a non-persistent
    user-defined aggregate function, you must define a "step" function with
    a name equal to the function name, and a "finalize" function with a name
    equal to the function name with "_finalize" appended. For non-persistent
    functions, both functions must be global in Lua, and for a C function,
    the functions with the these names must be defined in the appropriate
    .so file.
    
    Example of non-persistent aggregate function:
    ```
    F0 = function(x, y)
        if x == nil then
            x = {sum = 0, count = 0}
        end
        x.sum = x.sum + y
        x.count = x.count + 1
        return x
    end
    
    F0_finalize = function(x)
        return x.sum / x.count
    end
    
    box.schema.func.create('F0', {aggregate = 'group', returns = 'number',
                                  param_list = {'integer'}, exports = {'SQL'}})
    box.execute([[SELECT f0(column_3) FROM (VALUES(1), (2), (3));]])
    ```
    
    To use a persistent user-defined aggregate function, you must define the
    "body" field in the function definition as a map with two fields, "step"
    and "finalize" according to the rules defined above.
    
    Example of persistent aggregate function:
    ```
    body = {
        step = [[function(x, y)
            if x == nil then
                x = {sum = 0, count = 0}
            end
            x.sum = x.sum + y
            x.count = x.count + 1
            return x
        end]],
        finalize = [[function(x)
            return x.sum / x.count
        end]]
    }
    
    box.schema.func.create('F1', {aggregate = 'group', returns = 'number',
                                  body = body, param_list = {'integer'},
                                  exports = {'SQL'}})
    box.execute([[SELECT f1(column_3) FROM (VALUES(1), (2), (3));]])
    ```

diff --git a/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md b/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md
new file mode 100644
index 000000000..68adad478
--- /dev/null
+++ b/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md
@@ -0,0 +1,4 @@
+## feature/sql
+
+* Custom SQL aggregate function and an interface to create them is now
+  available (gh-2579).
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 65c1cb952..5381612af 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3276,6 +3276,98 @@ func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid)
 	return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid);
 }
 
+static int
+func_def_check_body(struct tuple *tuple)
+{
+	assert(tuple_field_count(tuple) > BOX_FUNC_FIELD_BODY);
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	enum mp_type type = mp_typeof(*field);
+	if (type == MP_STR)
+		return 0;
+	if (type != MP_MAP) {
+		diag_set(ClientError, ER_FIELD_TYPE, "map or string",
+			 mp_type_strs[type]);
+		return -1;
+	}
+	const char *agg = tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE);
+	if (agg == NULL)
+		return -1;
+	if (STR2ENUM(func_aggregate, agg) != FUNC_AGGREGATE_GROUP) {
+		const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
+		diag_set(ClientError, ER_CREATE_FUNCTION, name,
+			 "only aggregate functions can have map as body");
+		return -1;
+	}
+
+	bool has_step = false;
+	bool has_finalize = false;
+	if (mp_decode_map(&field) != 2)
+		goto error;
+	for (int i = 0; i < 2; ++i) {
+		const char *value;
+		uint32_t size;
+		if (mp_typeof(*field) != MP_STR)
+			goto error;
+		value = mp_decode_str(&field, &size);
+		if (strlcmp("step", value, size) == 0)
+			has_step = true;
+		else if (strlcmp("finalize", value, size) == 0)
+			has_finalize = true;
+		else
+			goto error;
+		if (mp_typeof(*field) != MP_STR)
+			goto error;
+		mp_next(&field);
+	}
+	if (has_step && has_finalize)
+		return 0;
+error:
+	const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
+	diag_set(ClientError, ER_CREATE_FUNCTION, name,
+		 "body of aggregate function should be map that contains "
+		 "exactly two string fields: 'step' and 'finalize'");
+	return -1;
+}
+
+static const char *
+func_def_get_agg_step_body(struct tuple *tuple, uint32_t *body_len)
+{
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	if (mp_typeof(*field) == MP_STR)
+		return mp_decode_str(&field, body_len);
+	assert(mp_typeof(*field) == MP_MAP);
+	mp_decode_map(&field);
+	uint32_t size;
+	mp_decode_str(&field, &size);
+	if (size == strlen("step"))
+		return mp_decode_str(&field, body_len);
+	mp_next(&field);
+	mp_next(&field);
+	return mp_decode_str(&field, body_len);
+}
+
+static const char *
+func_def_get_agg_finalize_body(struct tuple *tuple, uint32_t *body_len)
+{
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	if (mp_typeof(*field) == MP_STR) {
+		*body_len = 0;
+		return field;
+	}
+	assert(mp_typeof(*field) == MP_MAP);
+	mp_decode_map(&field);
+	uint32_t size;
+	mp_decode_str(&field, &size);
+	if (size == strlen("finalize"))
+		return mp_decode_str(&field, body_len);
+	mp_next(&field);
+	mp_next(&field);
+	return mp_decode_str(&field, body_len);
+}
+
 /** Create a function definition from tuple. */
 static struct func_def *
 func_def_new_from_tuple(struct tuple *tuple)
@@ -3295,7 +3387,9 @@ func_def_new_from_tuple(struct tuple *tuple)
 	if (identifier_check(name, name_len) != 0)
 		return NULL;
 	if (field_count > BOX_FUNC_FIELD_BODY) {
-		body = tuple_field_str(tuple, BOX_FUNC_FIELD_BODY, &body_len);
+		if (func_def_check_body(tuple) != 0)
+			return NULL;
+		body = func_def_get_agg_step_body(tuple, &body_len);
 		if (body == NULL)
 			return NULL;
 		comment = tuple_field_str(tuple, BOX_FUNC_FIELD_COMMENT,
@@ -3498,6 +3592,11 @@ func_def_new_from_tuple(struct tuple *tuple)
 		def->exports.lua = true;
 		def->param_count = 0;
 	}
+	if (def->aggregate == FUNC_AGGREGATE_GROUP && def->exports.lua != 0) {
+		diag_set(ClientError, ER_CREATE_FUNCTION, def->name,
+			 "aggregate function can only be accessed in SQL");
+		return NULL;
+	}
 	if (func_def_check(def) != 0)
 		return NULL;
 	def_guard.is_active = false;
@@ -3536,6 +3635,49 @@ on_drop_func_rollback(struct trigger *trigger, void * /* event */)
 	return 0;
 }
 
+struct func_def *
+func_fin_def_new(struct tuple *tuple, const struct func_def *def)
+{
+	const char *name = tt_sprintf("%s_finalize", def->name);
+	size_t name_len = strlen(name);
+	uint32_t body_len;
+	const char *body = func_def_get_agg_finalize_body(tuple, &body_len);
+	if (body == NULL)
+		return NULL;
+	if ((body_len == 0 && def->body != NULL) ||
+	    (def->body == NULL && body_len != 0)) {
+		diag_set(ClientError, ER_CREATE_FUNCTION, def->name,
+			 "step or finalize of aggregate function is undefined");
+		return NULL;
+	}
+	uint32_t len = sizeof(struct func_def) + name_len + 1;
+	if (body_len > 0)
+		len += body_len + 1;
+	struct func_def *fin_def = (struct func_def *)malloc(len);
+	fin_def->fid = def->fid;
+	fin_def->uid = def->uid;
+	if (body_len > 0) {
+		fin_def->body = fin_def->name + name_len + 1;
+		memcpy(fin_def->body, body, body_len);
+		fin_def->body[body_len] = '\0';
+	} else {
+		fin_def->body = NULL;
+	}
+	fin_def->comment = NULL;
+	fin_def->setuid = def->setuid;
+	fin_def->is_deterministic = def->is_deterministic;
+	fin_def->is_sandboxed = def->is_sandboxed;
+	fin_def->param_count = 0;
+	fin_def->returns = def->returns;
+	fin_def->aggregate = FUNC_AGGREGATE_GROUP;
+	fin_def->language = def->language;
+	fin_def->name_len = name_len;
+	fin_def->exports = def->exports;
+	fin_def->opts = def->opts;
+	memcpy(fin_def->name, name, name_len + 1);
+	return fin_def;
+}
+
 /**
  * A trigger invoked on replace in a space containing
  * functions on which there were defined any grants.
@@ -3568,6 +3710,16 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 		struct func *func = func_new(def);
 		if (func == NULL)
 			return -1;
+		if (def->aggregate == FUNC_AGGREGATE_GROUP) {
+			struct func_def *fin_def =
+				func_fin_def_new(new_tuple, def);
+			if (fin_def == NULL)
+				return -1;
+			struct func *func_fin = func_new(fin_def);
+			if (func_fin == NULL)
+				return -1;
+			func_fin_cache_insert(func_fin);
+		}
 		def_guard.is_active = false;
 		func_cache_insert(func);
 		on_rollback->data = func;
@@ -3610,6 +3762,8 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 			txn_alter_trigger_new(on_drop_func_rollback, old_func);
 		if (on_commit == NULL || on_rollback == NULL)
 			return -1;
+		if (old_func->def->aggregate == FUNC_AGGREGATE_GROUP)
+			func_fin_cache_delete(old_func->def->fid);
 		func_cache_delete(old_func->def->fid);
 		txn_stmt_on_commit(stmt, on_commit);
 		txn_stmt_on_rollback(stmt, on_rollback);
@@ -3622,9 +3776,13 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 		 * definition to support upgrade script.
 		 */
 		struct func_def *old_def = NULL, *new_def = NULL;
-		auto guard = make_scoped_guard([&old_def, &new_def] {
+		struct func_def *new_fin_def = NULL;
+		struct func_def *old_fin_def = NULL;
+		auto guard = make_scoped_guard([=] {
 			free(old_def);
 			free(new_def);
+			free(new_fin_def);
+			free(old_fin_def);
 		});
 		old_def = func_def_new_from_tuple(old_tuple);
 		new_def = func_def_new_from_tuple(new_tuple);
@@ -3635,6 +3793,18 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 				  "alter");
 			return -1;
 		}
+		if (new_def->aggregate != FUNC_AGGREGATE_GROUP)
+			return 0;
+		new_fin_def = func_fin_def_new(new_tuple, new_def);
+		old_fin_def = func_fin_def_new(old_tuple, old_def);
+		if (new_fin_def == NULL || old_fin_def == NULL)
+			return -1;
+		if (func_def_cmp(new_fin_def, old_fin_def) != 0) {
+			diag_set(ClientError, ER_UNSUPPORTED, "function",
+				 "alter");
+			return -1;
+		}
+		return 0;
 	}
 	return 0;
 }
diff --git a/src/box/bootstrap.snap b/src/box/bootstrap.snap
index 018670d2a..610e513fd 100644
Binary files a/src/box/bootstrap.snap and b/src/box/bootstrap.snap differ
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 99b65b0f5..8c328852d 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -2597,7 +2597,8 @@ box.schema.func.create = function(name, opts)
     opts = opts or {}
     check_param_table(opts, { setuid = 'boolean',
                               if_not_exists = 'boolean',
-                              language = 'string', body = 'string',
+                              language = 'string', body = 'any',
+                              aggregate = 'string',
                               is_deterministic = 'boolean',
                               is_sandboxed = 'boolean',
                               is_multikey = 'boolean',
diff --git a/src/box/lua/upgrade.lua b/src/box/lua/upgrade.lua
index f9e6543c2..0e2c4a091 100644
--- a/src/box/lua/upgrade.lua
+++ b/src/box/lua/upgrade.lua
@@ -1212,6 +1212,21 @@ local function upgrade_to_2_9_1()
     remove_sql_builtin_functions_from_func()
 end
 
+--------------------------------------------------------------------------------
+-- Tarantool 2.10.1
+--------------------------------------------------------------------------------
+local function change_func_body_field_type()
+    local _func = box.space._func
+    local format = _func:format()
+    -- Change type of 'body' field from 'string' to 'any'.
+    format[6].type = 'any'
+    _func:format(format)
+end
+
+local function upgrade_to_2_10_1()
+    change_func_body_field_type()
+end
+
 --------------------------------------------------------------------------------
 
 local handlers = {
@@ -1229,6 +1244,7 @@ local handlers = {
     {version = mkversion(2, 3, 1), func = upgrade_to_2_3_1, auto = true},
     {version = mkversion(2, 7, 1), func = upgrade_to_2_7_1, auto = true},
     {version = mkversion(2, 9, 1), func = upgrade_to_2_9_1, auto = true},
+    {version = mkversion(2, 10, 1), func = upgrade_to_2_10_1, auto = true},
 }
 
 -- Schema version of the snapshot.
diff --git a/src/box/schema.cc b/src/box/schema.cc
index c248d4ee4..eb1b4a7dd 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -60,6 +60,7 @@ static struct mh_i32ptr_t *spaces;
 static struct mh_strnptr_t *spaces_by_name;
 static struct mh_i32ptr_t *funcs;
 static struct mh_strnptr_t *funcs_by_name;
+static struct mh_i32ptr_t *funcs_fin;
 static struct mh_i32ptr_t *sequences;
 /** Public change counter. On its update clients need to fetch
  *  new space data from the instance. */
@@ -379,6 +380,7 @@ schema_init(void)
 	spaces_by_name = mh_strnptr_new();
 	funcs = mh_i32ptr_new();
 	funcs_by_name = mh_strnptr_new();
+	funcs_fin = mh_i32ptr_new();
 	sequences = mh_i32ptr_new();
 	/*
 	 * Create surrogate space objects for the mandatory system
@@ -540,6 +542,14 @@ schema_free(void)
 		func_delete(func);
 	}
 	mh_i32ptr_delete(funcs);
+	while (mh_size(funcs_fin) > 0) {
+		mh_int_t i = mh_first(funcs_fin);
+		struct func *func =
+			(struct func *)mh_i32ptr_node(funcs_fin, i)->val;
+		func_cache_delete(func->def->fid);
+		func_delete(func);
+	}
+	mh_i32ptr_delete(funcs_fin);
 	while (mh_size(sequences) > 0) {
 		mh_int_t i = mh_first(sequences);
 
@@ -597,6 +607,33 @@ func_by_name(const char *name, uint32_t name_len)
 	return (struct func *) mh_strnptr_node(funcs_by_name, func)->val;
 }
 
+void
+func_fin_cache_insert(struct func *func)
+{
+	uint32_t fid = func->def->fid;
+	assert(func_fin_by_id(fid) == NULL);
+	const struct mh_i32ptr_node_t node = {fid, func};
+	mh_i32ptr_put(funcs_fin, &node, NULL, NULL);
+}
+
+void
+func_fin_cache_delete(uint32_t fid)
+{
+	mh_int_t k = mh_i32ptr_find(funcs_fin, fid, NULL);
+	if (k == mh_end(funcs_fin))
+		return;
+	mh_i32ptr_del(funcs_fin, k, NULL);
+}
+
+struct func *
+func_fin_by_id(uint32_t fid)
+{
+	mh_int_t func = mh_i32ptr_find(funcs_fin, fid, NULL);
+	if (func == mh_end(funcs_fin))
+		return NULL;
+	return (struct func *)mh_i32ptr_node(funcs_fin, func)->val;
+}
+
 int
 schema_find_grants(const char *type, uint32_t id, bool *out)
 {
diff --git a/src/box/schema.h b/src/box/schema.h
index d3bbdd590..186fa9db7 100644
--- a/src/box/schema.h
+++ b/src/box/schema.h
@@ -102,6 +102,21 @@ func_by_id(uint32_t fid);
 struct func *
 func_by_name(const char *name, uint32_t name_len);
 
+/**
+ * Insert a new function object for finalize of aggregate function in the
+ * function cache.
+ *
+ * @param func Function object to insert.
+ */
+void
+func_fin_cache_insert(struct func *func);
+
+void
+func_fin_cache_delete(uint32_t fid);
+
+struct func *
+func_fin_by_id(uint32_t fid);
+
 /** Call a visitor function on every space in the space cache. */
 int
 space_foreach(int (*func)(struct space *sp, void *udata), void *udata);
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index eb169aeb8..46ffe377c 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -5469,7 +5469,18 @@ analyzeAggregate(Walker * pWalker, Expr * pExpr)
 						       (pExpr, EP_xIsSelect));
 						pItem = &pAggInfo->aFunc[i];
 						pItem->pExpr = pExpr;
-						pItem->iMem = ++pParse->nMem;
+						int n = pExpr->x.pList == NULL ?
+							0 : pExpr->x.pList->nExpr;
+						/*
+						 * Allocate 1 MEM for
+						 * accumulator and next n MEMs
+						 * for arguments. This makes it
+						 * easier to pass these n + 1
+						 * MEMs to the user-defined
+						 * aggregate function.
+						 */
+						pItem->iMem = pParse->nMem + 1;
+						pParse->nMem += n + 1;
 						assert(!ExprHasProperty
 						       (pExpr, EP_IntValue));
 						pItem->func =
@@ -5479,12 +5490,6 @@ analyzeAggregate(Walker * pWalker, Expr * pExpr)
 								true;
 							return WRC_Abort;
 						}
-						assert(pItem->func->def->
-						       language ==
-						       FUNC_LANGUAGE_SQL_BUILTIN &&
-						       pItem->func->def->
-						       aggregate ==
-						       FUNC_AGGREGATE_GROUP);
 						if (pExpr->flags & EP_Distinct) {
 							pItem->iDistinct =
 								pParse->nTab++;
diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index b69bf7fd6..493b69a4c 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -2072,9 +2072,12 @@ uint32_t
 sql_func_flags(const char *name)
 {
 	struct sql_func_dictionary *dict = built_in_func_get(name);
-	if (dict == NULL)
+	if (dict != NULL)
+		return dict->flags;
+	struct func *func = func_by_name(name, strlen(name));
+	if (func == NULL || func->def->aggregate != FUNC_AGGREGATE_GROUP)
 		return 0;
-	return dict->flags;
+	return SQL_FUNC_AGG;
 }
 
 static struct func_vtab func_sql_builtin_vtab;
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 2e33a31d2..c77cd1c9f 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -4648,8 +4648,6 @@ is_simple_count(struct Select *select, struct AggInfo *agg_info)
 		return NULL;
 	if (NEVER(agg_info->nFunc == 0))
 		return NULL;
-	assert(agg_info->aFunc->func->def->language ==
-	       FUNC_LANGUAGE_SQL_BUILTIN);
 	if (memcmp(agg_info->aFunc->func->def->name, "COUNT", 5) != 0 ||
 	    (agg_info->aFunc->pExpr->x.pList != NULL &&
 	     agg_info->aFunc->pExpr->x.pList->nExpr > 0))
@@ -5575,9 +5573,17 @@ finalizeAggFunctions(Parse * pParse, AggInfo * pAggInfo)
 	for (i = 0, pF = pAggInfo->aFunc; i < pAggInfo->nFunc; i++, pF++) {
 		ExprList *pList = pF->pExpr->x.pList;
 		assert(!ExprHasProperty(pF->pExpr, EP_xIsSelect));
-		sqlVdbeAddOp2(v, OP_AggFinal, pF->iMem,
-				  pList ? pList->nExpr : 0);
-		sqlVdbeAppendP4(v, pF->func, P4_FUNC);
+		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
+			sqlVdbeAddOp2(v, OP_AggFinal, pF->iMem,
+				      pList != NULL ? pList->nExpr : 0);
+			sqlVdbeAppendP4(v, pF->func, P4_FUNC);
+		} else {
+			sqlVdbeAddOp1(v, OP_UserAggFinal, pF->iMem);
+			const char *name = pF->func->def->name;
+			int len = pF->func->def->name_len;
+			char *str = sqlDbStrNDup(pParse->db, name, len);
+			sqlVdbeAppendP4(v, str, P4_DYNAMIC);
+		}
 	}
 }
 
@@ -5604,7 +5610,7 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
 		assert(!ExprHasProperty(pF->pExpr, EP_xIsSelect));
 		if (pList) {
 			nArg = pList->nExpr;
-			regAgg = sqlGetTempRange(pParse, nArg);
+			regAgg = pF->iMem + 1;
 			sqlExprCodeExprList(pParse, pList, regAgg, 0,
 						SQL_ECEL_DUP);
 		} else {
@@ -5644,10 +5650,17 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
 			pParse->is_aborted = true;
 			return;
 		}
-		sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
-		sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
-		sql_expr_type_cache_change(pParse, regAgg, nArg);
-		sqlReleaseTempRange(pParse, regAgg, nArg);
+		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
+			sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
+			sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
+		} else {
+			sqlVdbeAddOp3(v, OP_UserAggStep, nArg, regAgg,
+				      pF->iMem);
+			const char *name = pF->func->def->name;
+			int len = pF->func->def->name_len;
+			char *str = sqlDbStrNDup(pParse->db, name, len);
+			sqlVdbeAppendP4(v, str, P4_DYNAMIC);
+		}
 		if (addrNext) {
 			sqlVdbeResolveLabel(v, addrNext);
 			sqlExprCacheClear(pParse);
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 24cb28260..e2253caa1 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1289,6 +1289,93 @@ case OP_FunctionByName: {
 	break;
 }
 
+/*
+ * Opcode: UserAggStep P1 P2 P3 P4 *
+ * Synopsis: accum=r[P3] step(r[P2 at P1])
+ *
+ * Execute the step function for a user-defined aggregate function. The step
+ * function has P1 arguments. P4 is a pointer to a function object that defines
+ * the aggregate function. Register P3 is the accumulator.
+ *
+ * The P1 arguments are taken from register P2 and its successors. The P2
+ * register must be immediately after the P3 register.
+ */
+case OP_UserAggStep: {
+	assert(pOp->p4type == P4_DYNAMIC);
+	assert(pOp->p1 == 0 || pOp->p3 == pOp->p2 - 1);
+	struct func *func = func_by_name(pOp->p4.z, strlen(pOp->p4.z));
+	if (func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+
+	struct Mem *argv = &aMem[pOp->p3];
+	struct port args;
+	struct port ret;
+
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	port_vdbemem_create(&args, (struct sql_value *)argv, pOp->p1 + 1);
+	if (func_call(func, &args, &ret) != 0) {
+		region_truncate(region, region_svp);
+		goto abort_due_to_error;
+	}
+
+	uint32_t size;
+	struct Mem *mem = (struct Mem *)port_get_vdbemem(&ret, &size);
+	assert(size == 1);
+	bool is_error = mem == NULL || mem_copy(&aMem[pOp->p3], mem) != 0;
+	port_destroy(&ret);
+	region_truncate(region, region_svp);
+	if (is_error)
+		goto abort_due_to_error;
+	break;
+}
+
+/* Opcode: UserAggFinal P1 * * P4 *
+ * Synopsis: r[P1] = finalize(r[P1])
+ *
+ * Execute the finalize function for a user-defined aggregate function. P4 is a
+ * pointer to a function object that defines the aggregate function. Register P1
+ * is the accumulator. The result will be written to register P1.
+ */
+case OP_UserAggFinal: {
+	assert(pOp->p1 > 0 && pOp->p1 <= (p->nMem + 1 - p->nCursor));
+	assert(pOp->p4type == P4_DYNAMIC);
+	struct func *step_func = func_by_name(pOp->p4.z, strlen(pOp->p4.z));
+	if (step_func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+	struct func *func = func_fin_by_id(step_func->def->fid);
+	if (func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+
+	struct Mem *argv = &aMem[pOp->p1];
+	struct port args;
+	struct port ret;
+
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	port_vdbemem_create(&args, (struct sql_value *)argv, 1);
+	if (func_call(func, &args, &ret) != 0) {
+		region_truncate(region, region_svp);
+		goto abort_due_to_error;
+	}
+
+	uint32_t size;
+	struct Mem *mem = (struct Mem *)port_get_vdbemem(&ret, &size);
+	assert(size == 1);
+	bool is_error = mem == NULL || mem_copy(&aMem[pOp->p1], mem) != 0;
+	port_destroy(&ret);
+	region_truncate(region, region_svp);
+	if (is_error)
+		goto abort_due_to_error;
+	break;
+}
+
 /* Opcode: BitAnd P1 P2 P3 * *
  * Synopsis: r[P3]=r[P1]&r[P2]
  *
diff --git a/test/box-py/bootstrap.result b/test/box-py/bootstrap.result
index cea440c64..78d444bb0 100644
--- a/test/box-py/bootstrap.result
+++ b/test/box-py/bootstrap.result
@@ -4,7 +4,7 @@ box.internal.bootstrap()
 box.space._schema:select{}
 ---
 - - ['max_id', 511]
-  - ['version', 2, 9, 1]
+  - ['version', 2, 10, 1]
 ...
 box.space._cluster:select{}
 ---
@@ -54,7 +54,7 @@ box.space._space:select{}
   - [296, 1, '_func', 'memtx', 0, {}, [{'name': 'id', 'type': 'unsigned'}, {'name': 'owner',
         'type': 'unsigned'}, {'name': 'name', 'type': 'string'}, {'name': 'setuid',
         'type': 'unsigned'}, {'name': 'language', 'type': 'string'}, {'name': 'body',
-        'type': 'string'}, {'name': 'routine_type', 'type': 'string'}, {'name': 'param_list',
+        'type': 'any'}, {'name': 'routine_type', 'type': 'string'}, {'name': 'param_list',
         'type': 'array'}, {'name': 'returns', 'type': 'string'}, {'name': 'aggregate',
         'type': 'string'}, {'name': 'sql_data_access', 'type': 'string'}, {'name': 'is_deterministic',
         'type': 'boolean'}, {'name': 'is_sandboxed', 'type': 'boolean'}, {'name': 'is_null_call',
diff --git a/test/sql-tap/CMakeLists.txt b/test/sql-tap/CMakeLists.txt
index c4ec1214a..136a517d4 100644
--- a/test/sql-tap/CMakeLists.txt
+++ b/test/sql-tap/CMakeLists.txt
@@ -3,6 +3,8 @@ build_module(gh-5938-wrong-string-length gh-5938-wrong-string-length.c)
 target_link_libraries(gh-5938-wrong-string-length msgpuck)
 build_module(gh-6024-funcs-return-bin gh-6024-funcs-return-bin.c)
 target_link_libraries(gh-6024-funcs-return-bin msgpuck)
+build_module(gh-2579-custom-aggregate gh-2579-custom-aggregate.c)
+target_link_libraries(gh-2579-custom-aggregate msgpuck)
 build_module(sql_uuid sql_uuid.c)
 target_link_libraries(sql_uuid msgpuck core)
 build_module(decimal decimal.c)
diff --git a/test/sql-tap/gh-2579-custom-aggregate.c b/test/sql-tap/gh-2579-custom-aggregate.c
new file mode 100644
index 000000000..5552effc5
--- /dev/null
+++ b/test/sql-tap/gh-2579-custom-aggregate.c
@@ -0,0 +1,48 @@
+#include "msgpuck.h"
+#include "module.h"
+#include "mp_decimal.h"
+#include "lua/tnt_msgpuck.h"
+#include "mp_uuid.h"
+
+enum {
+	BUF_SIZE = 512,
+};
+
+int
+S3(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+	(void)args_end;
+	uint32_t arg_count = mp_decode_array(&args);
+	if (arg_count != 2) {
+		return box_error_set(__FILE__, __LINE__, ER_PROC_C,
+				     "invalid argument count");
+	}
+	int sum = 0;
+	if (mp_typeof(*args) != MP_UINT)
+		mp_decode_nil(&args);
+	else
+		sum = mp_decode_uint(&args);
+	sum += mp_decode_uint(&args);
+	char res[BUF_SIZE];
+	char *end = mp_encode_uint(res, sum);
+	box_return_mp(ctx, res, end);
+	return 0;
+}
+
+int
+S3_finalize(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+	(void)args_end;
+	uint32_t arg_count = mp_decode_array(&args);
+	if (arg_count != 1) {
+		return box_error_set(__FILE__, __LINE__, ER_PROC_C,
+				     "invalid argument count");
+	}
+	int sum = 0;
+	if (mp_typeof(*args) == MP_UINT)
+		sum = mp_decode_uint(&args);
+	char res[BUF_SIZE];
+	char *end = mp_encode_double(res, sum / 10.0);
+	box_return_mp(ctx, res, end);
+	return 0;
+}
diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua
new file mode 100755
index 000000000..b207257c6
--- /dev/null
+++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua
@@ -0,0 +1,278 @@
+#!/usr/bin/env tarantool
+local build_path = os.getenv("BUILDDIR")
+package.cpath = build_path..'/test/sql-tap/?.so;'..build_path..'/test/sql-tap/?.dylib;'..package.cpath
+
+local test = require("sqltester")
+test:plan(15)
+
+-- Make sure that non-persistent aggregate functions are working as expected.
+local step = function(x, y)
+    if x == nil then
+        x = {sum = 0, count = 0}
+    end
+    x.sum = x.sum + y
+    x.count = x.count + 1
+    return x
+end
+
+local finalize = function(x)
+    return x.sum / x.count
+end
+
+rawset(_G, 'S1', step)
+rawset(_G, 'S1_finalize', finalize)
+box.schema.func.create('S1', {aggregate = 'group', returns = 'number',
+                              param_list = {'integer'}, exports = {'SQL'}})
+
+test:execsql([[
+    CREATE TABLE t(i INTEGER PRIMARY KEY AUTOINCREMENT, a INT);
+    INSERT INTO t(a) VALUES(1), (1), (2), (2), (3);
+]])
+
+test:do_execsql_test(
+    "gh-2579-1",
+    [[
+        SELECT avg(a), s1(a) FROM t;
+    ]], {
+        1, 1.8
+    })
+
+test:do_execsql_test(
+    "gh-2579-2",
+    [[
+        SELECT avg(distinct a), s1(distinct a) FROM t;
+    ]], {
+        2, 2
+    })
+
+-- Make sure that persistent aggregate functions are working as expected.
+local body_s2 = {
+    step = [[function(x, y)
+        if x == nil then
+            x = {sum = 0, count = 0}
+        end
+        x.sum = x.sum + y
+        x.count = x.count + 1
+        return x
+    end]],
+    finalize = [[function(x)
+        return x.sum / x.count
+    end]]
+}
+
+box.schema.func.create('S2', {aggregate = 'group', returns = 'number',
+                              body = body_s2, param_list = {'integer'},
+                              exports = {'SQL'}})
+
+test:do_execsql_test(
+    "gh-2579-3",
+    [[
+        SELECT avg(a), s2(a) FROM t;
+    ]], {
+        1, 1.8
+    })
+
+test:do_execsql_test(
+    "gh-2579-4",
+    [[
+        SELECT avg(distinct a), s2(distinct a) FROM t;
+    ]], {
+        2, 2
+    })
+
+-- Make sure that C aggregate functions are working as expected.
+box.schema.func.create("gh-2579-custom-aggregate.S3", {
+    language = "C", aggregate = 'group', param_list = {"integer"},
+    returns = "number", exports = {"SQL"},
+})
+
+test:do_execsql_test(
+    "gh-2579-5",
+    [[
+        SELECT "gh-2579-custom-aggregate.S3"(a) FROM t;
+    ]], {
+        0.9
+    })
+
+test:do_execsql_test(
+    "gh-2579-6",
+    [[
+        SELECT "gh-2579-custom-aggregate.S3"(distinct a) FROM t;
+    ]], {
+        0.6
+    })
+
+--
+-- Make sure that user-defined aggregate functions can have more than one
+-- argument.
+--
+local body_s4 = {
+    step = [[function(x, y, z)
+        if x == nil then
+            x = {sum = 0, count = 0}
+        end
+        x.sum = x.sum + y + z
+        x.count = x.count + 1
+        return x
+    end]],
+    finalize = [[function(x)
+        return x.sum / x.count
+    end]]
+}
+box.schema.func.create('S4', {aggregate = 'group', returns = 'number',
+                              param_list = {'integer', 'integer'},
+                              body = body_s4, exports = {'SQL'}})
+
+test:do_execsql_test(
+    "gh-2579-7",
+    [[
+        SELECT s4(a, i) FROM t;
+    ]], {
+        4.8
+    })
+
+--
+-- Make sure that user-defined aggregate functions with more than one argument
+-- cannot work with DISTINCT.
+--
+test:do_catchsql_test(
+    "gh-2579-8",
+    [[
+        SELECT s4(distinct a, i) FROM t;
+    ]], {
+        1, "DISTINCT aggregates must have exactly one argument"
+    })
+
+-- Make sure user-defined aggregate functions are not available in Lua.
+test:do_test(
+    "gh-2579-9",
+    function()
+        local def = {aggregate = 'group', returns = 'number',
+                     param_list = {'integer'}, exports = {'LUA', 'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S5', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S5': aggregate function can only be "..
+        "accessed in SQL"
+    })
+
+--
+-- Make sure user-defined aggregate functions should have both STEP and FINALIZE
+-- persistent or non-persistent.
+--
+test:do_test(
+    "gh-2579-10",
+    function()
+        local body = "function(x) return x / 1000 end"
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S6', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S6': step or finalize of aggregate "..
+        "function is undefined"
+    })
+
+--
+-- Make sure body of user-defined aggregate function should have exactly two
+-- fields: "step" and "finalize".
+--
+test:do_test(
+    "gh-2579-11",
+    function()
+        local body = {step = "function(x) return x / 1000 end"}
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S7', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S7': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+test:do_test(
+    "gh-2579-12",
+    function()
+        local body = {finalize = "function(x) return x / 1000 end"}
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S8', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S8': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+test:do_test(
+    "gh-2579-13",
+    function()
+        local body = {
+            step = [[function(x, y, z)
+                if x == nil then
+                    x = {sum = 0, count = 0}
+                end
+                x.sum = x.sum + y + z
+                x.count = x.count + 1
+                return x
+            end]],
+            finalize = [[function(x)
+                return x.sum / x.count
+            end]],
+            something = 0
+        }
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S9', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S9': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+--
+-- Make sure that user-defined aggregate functions that are throwing an error
+-- are working correctly.
+--
+local body_s10 = {
+    step = [[function(x) error("some error") return 0 end]],
+    finalize = [[function(x) return 0 end]]
+}
+
+box.schema.func.create('S10', {aggregate = 'group', returns = 'number',
+                              body = body_s10, exports = {'SQL'}})
+
+test:do_catchsql_test(
+    "gh-2579-14",
+    [[
+        SELECT s10();
+    ]], {
+        1, '[string "return function(x) error("some error") return..."]:1: '..
+           'some error'
+    })
+
+local body_s11 = {
+    step = [[function(x) return 0 end]],
+    finalize = [[function(x) error("another error") return 0 end]]
+}
+
+box.schema.func.create('S11', {aggregate = 'group', returns = 'number',
+                              body = body_s11, exports = {'SQL'}})
+
+test:do_catchsql_test(
+    "gh-2579-15",
+    [[
+        SELECT s11();
+    ]], {
+        1, '[string "return function(x) error("another error") ret..."]:1: '..
+           'another error'
+    })
+
+box.space._func.index[2]:delete("S1")
+box.space._func.index[2]:delete("S2")
+box.space._func.index[2]:delete("gh-2579-custom-aggregate.S3")
+box.space._func.index[2]:delete("S4")
+box.space._func.index[2]:delete("S10")
+box.space._func.index[2]:delete("S11")
+box.space.T:drop()
+
+test:finish_test()


More information about the Tarantool-patches mailing list