Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v1 1/1] sql: introduce user-defined aggregate functions
@ 2022-01-14 13:38 Mergen Imeev via Tarantool-patches
  2022-01-18  0:10 ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 4+ messages in thread
From: Mergen Imeev via Tarantool-patches @ 2022-01-14 13:38 UTC (permalink / raw)
  To: v.shpilevoy; +Cc: tarantool-patches

This patch introduces user-defined SQL aggregate functions and an
interface for creating them.

Closes #2579

@TarantoolBot document
Title: User-defined aggregate functions

User-defined aggregate functions are now available. There are three
types of user-defined aggregate functions: persistent functions,
non-persistent functions, and C-functions. All user-defined aggregate
functions consist of two functions: a "step" function, which is executed
for each record in the group, and a "finalize" function, which is
executed after all records in the group have been processed.

The "Step" function has one more argument than specified in "param_list"
in the definition. This argument is the current state of the aggregate
function and must be the first argument.

The "Finalize" function has exactly one argument, which is the current
state of the aggregate function.

To use a C user-defined aggregate function and a non-persistent
user-defined aggregate function, you must define a "step" function with
a name equal to the function name, and a "finalize" function with a name
equal to the function name with "_finalize" appended. For non-persistent
functions, both functions must be global in Lua, and for a C function,
the functions with the these names must be defined in the appropriate
.so file.

Example of non-persistent aggregate function:
```
F0 = function(x, y)
    if x == nil then
        x = {sum = 0, count = 0}
    end
    x.sum = x.sum + y
    x.count = x.count + 1
    return x
end

F0_finalize = function(x)
    return x.sum / x.count
end

box.schema.func.create('F0', {aggregate = 'group', returns = 'number',
                              param_list = {'integer'}, exports = {'SQL'}})
box.execute([[SELECT f0(column_3) FROM (VALUES(1), (2), (3));]])
```

To use a persistent user-defined aggregate function, you must define the
"body" field in the function definition as a map with two fields, "step"
and "finalize" according to the rules defined above.

Example of persistent aggregate function:
```
body = {
    step = [[function(x, y)
        if x == nil then
            x = {sum = 0, count = 0}
        end
        x.sum = x.sum + y
        x.count = x.count + 1
        return x
    end]],
    finalize = [[function(x)
        return x.sum / x.count
    end]]
}

box.schema.func.create('F1', {aggregate = 'group', returns = 'number',
                              body = body, param_list = {'integer'},
                              exports = {'SQL'}})
box.execute([[SELECT f1(column_3) FROM (VALUES(1), (2), (3));]])
```
---
https://github.com/tarantool/tarantool/issues/2579
https://github.com/tarantool/tarantool/tree/imeevma/gh-2579-introduce-custom-aggregate-functions

 ...79-introduce-custom-aggregate-functions.md |   4 +
 src/box/alter.cc                              | 161 +++++++++++-
 src/box/bootstrap.snap                        | Bin 4891 -> 4915 bytes
 src/box/lua/schema.lua                        |   3 +-
 src/box/lua/upgrade.lua                       |  16 ++
 src/box/schema.cc                             |  37 +++
 src/box/schema.h                              |   9 +
 src/box/sql/expr.c                            |  19 +-
 src/box/sql/func.c                            |   7 +-
 src/box/sql/select.c                          |  33 ++-
 src/box/sql/vdbe.c                            |  87 +++++++
 test/box-py/bootstrap.result                  |   4 +-
 test/sql-tap/CMakeLists.txt                   |   2 +
 test/sql-tap/gh-2579-custom-aggregate.c       |  48 ++++
 .../sql-tap/gh-2579-custom-aggregate.test.lua | 232 ++++++++++++++++++
 15 files changed, 638 insertions(+), 24 deletions(-)
 create mode 100644 changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md
 create mode 100644 test/sql-tap/gh-2579-custom-aggregate.c
 create mode 100755 test/sql-tap/gh-2579-custom-aggregate.test.lua

diff --git a/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md b/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md
new file mode 100644
index 000000000..68adad478
--- /dev/null
+++ b/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md
@@ -0,0 +1,4 @@
+## feature/sql
+
+* Custom SQL aggregate function and an interface to create them is now
+  available (gh-2579).
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 65c1cb952..f1dfcd807 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3276,6 +3276,85 @@ func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid)
 	return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid);
 }
 
+static int
+func_def_check_body(struct tuple *tuple)
+{
+	assert(tuple_field_count(tuple) > BOX_FUNC_FIELD_BODY);
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	enum mp_type type = mp_typeof(*field);
+	if (type == MP_STR)
+		return 0;
+	if (type != MP_MAP) {
+		diag_set(ClientError, ER_FIELD_TYPE, "map or array",
+			 mp_type_strs[type]);
+		return -1;
+	}
+	const char *agg = tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE);
+	if (agg == NULL)
+		return -1;
+	if (STR2ENUM(func_aggregate, agg) != FUNC_AGGREGATE_GROUP) {
+		const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
+		diag_set(ClientError, ER_CREATE_FUNCTION, name,
+			 "only aggregate functions can have map as body");
+		return -1;
+	}
+
+	bool has_step = false;
+	bool has_finalize = false;
+	if (mp_decode_map(&field) != 2)
+		goto error;
+	for (int i = 0; i < 2; ++i) {
+		const char *value;
+		uint32_t size;
+		if (mp_typeof(*field) != MP_STR)
+			goto error;
+		value = mp_decode_str(&field, &size);
+		if (size == strlen("step")) {
+			if (strncmp(value, "step", size) != 0)
+				goto error;
+			has_step = true;
+		} else if (size == strlen("finalize")) {
+			if (strncmp(value, "finalize", size) != 0)
+				goto error;
+			has_finalize = true;
+		}
+		if (mp_typeof(*field) != MP_STR)
+			goto error;
+		mp_next(&field);
+	}
+	if (has_step && has_finalize)
+		return 0;
+error:
+	const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
+	diag_set(ClientError, ER_CREATE_FUNCTION, name,
+		 "body of aggregate function should be map that contains "
+		 "exactly two string fields: 'step' and 'finalize'");
+	return -1;
+}
+
+static const char *
+func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step)
+{
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	if (mp_typeof(*field) == MP_STR) {
+		if (is_step)
+			return mp_decode_str(&field, body_len);
+		*body_len = 0;
+		return field;
+	}
+	assert(mp_typeof(*field) == MP_MAP);
+	mp_decode_map(&field);
+	uint32_t size;
+	mp_decode_str(&field, &size);
+	if (size == (is_step ? strlen("step") : strlen("finalize")))
+		return mp_decode_str(&field, body_len);
+	mp_next(&field);
+	mp_next(&field);
+	return mp_decode_str(&field, body_len);
+}
+
 /** Create a function definition from tuple. */
 static struct func_def *
 func_def_new_from_tuple(struct tuple *tuple)
@@ -3295,7 +3374,9 @@ func_def_new_from_tuple(struct tuple *tuple)
 	if (identifier_check(name, name_len) != 0)
 		return NULL;
 	if (field_count > BOX_FUNC_FIELD_BODY) {
-		body = tuple_field_str(tuple, BOX_FUNC_FIELD_BODY, &body_len);
+		if (func_def_check_body(tuple) != 0)
+			return NULL;
+		body = func_def_get_agg_body(tuple, &body_len, true);
 		if (body == NULL)
 			return NULL;
 		comment = tuple_field_str(tuple, BOX_FUNC_FIELD_COMMENT,
@@ -3498,6 +3579,11 @@ func_def_new_from_tuple(struct tuple *tuple)
 		def->exports.lua = true;
 		def->param_count = 0;
 	}
+	if (def->aggregate == FUNC_AGGREGATE_GROUP && def->exports.lua != 0) {
+		diag_set(ClientError, ER_CREATE_FUNCTION, def->name,
+			 "aggregate function can only be accessed in SQL");
+		return NULL;
+	}
 	if (func_def_check(def) != 0)
 		return NULL;
 	def_guard.is_active = false;
@@ -3536,6 +3622,49 @@ on_drop_func_rollback(struct trigger *trigger, void * /* event */)
 	return 0;
 }
 
+struct func_def *
+func_fin_def_new(struct tuple *tuple, const struct func_def *def)
+{
+	const char *name = tt_sprintf("%s_finalize", def->name);
+	size_t name_len = strlen(name);
+	uint32_t body_len;
+	const char *body = func_def_get_agg_body(tuple, &body_len, false);
+	if (body == NULL)
+		return NULL;
+	if ((body_len == 0 && def->body != NULL) ||
+	    (def->body == NULL && body_len != 0)) {
+		diag_set(ClientError, ER_CREATE_FUNCTION, def->name,
+			 "step or finalize of aggregate function is undefined");
+		return NULL;
+	}
+	uint32_t len = sizeof(struct func_def) + name_len + 1;
+	if (body_len > 0)
+		len += body_len + 1;
+	struct func_def *fin_def = (struct func_def *)malloc(len);
+	fin_def->fid = def->fid;
+	fin_def->uid = def->uid;
+	if (body_len > 0) {
+		fin_def->body = fin_def->name + name_len + 1;
+		memcpy(fin_def->body, body, body_len);
+		fin_def->body[body_len] = '\0';
+	} else {
+		fin_def->body = NULL;
+	}
+	fin_def->comment = NULL;
+	fin_def->setuid = def->setuid;
+	fin_def->is_deterministic = def->is_deterministic;
+	fin_def->is_sandboxed = def->is_sandboxed;
+	fin_def->param_count = 0;
+	fin_def->returns = def->returns;
+	fin_def->aggregate = FUNC_AGGREGATE_GROUP;
+	fin_def->language = def->language;
+	fin_def->name_len = name_len;
+	fin_def->exports = def->exports;
+	fin_def->opts = def->opts;
+	memcpy(fin_def->name, name, name_len + 1);
+	return fin_def;
+}
+
 /**
  * A trigger invoked on replace in a space containing
  * functions on which there were defined any grants.
@@ -3568,6 +3697,16 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 		struct func *func = func_new(def);
 		if (func == NULL)
 			return -1;
+		if (def->aggregate == FUNC_AGGREGATE_GROUP) {
+			struct func_def *fin_def =
+				func_fin_def_new(new_tuple, def);
+			if (fin_def == NULL)
+				return -1;
+			struct func *func_fin = func_new(fin_def);
+			if (func_fin == NULL)
+				return -1;
+			func_fin_cache_insert(func_fin);
+		}
 		def_guard.is_active = false;
 		func_cache_insert(func);
 		on_rollback->data = func;
@@ -3610,6 +3749,8 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 			txn_alter_trigger_new(on_drop_func_rollback, old_func);
 		if (on_commit == NULL || on_rollback == NULL)
 			return -1;
+		if (old_func->def->aggregate == FUNC_AGGREGATE_GROUP)
+			func_fin_cache_delete(old_func->def->fid);
 		func_cache_delete(old_func->def->fid);
 		txn_stmt_on_commit(stmt, on_commit);
 		txn_stmt_on_rollback(stmt, on_rollback);
@@ -3622,9 +3763,13 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 		 * definition to support upgrade script.
 		 */
 		struct func_def *old_def = NULL, *new_def = NULL;
-		auto guard = make_scoped_guard([&old_def, &new_def] {
+		struct func_def *new_fin_def = NULL;
+		struct func_def *old_fin_def = NULL;
+		auto guard = make_scoped_guard([=] {
 			free(old_def);
 			free(new_def);
+			free(new_fin_def);
+			free(old_fin_def);
 		});
 		old_def = func_def_new_from_tuple(old_tuple);
 		new_def = func_def_new_from_tuple(new_tuple);
@@ -3635,6 +3780,18 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 				  "alter");
 			return -1;
 		}
+		if (new_def->aggregate != FUNC_AGGREGATE_GROUP)
+			return 0;
+		new_fin_def = func_fin_def_new(new_tuple, new_def);
+		old_fin_def = func_fin_def_new(old_tuple, old_def);
+		if (new_fin_def == NULL || old_fin_def == NULL)
+			return -1;
+		if (func_def_cmp(new_fin_def, old_fin_def) != 0) {
+			diag_set(ClientError, ER_UNSUPPORTED, "function",
+				 "alter");
+			return -1;
+		}
+		return 0;
 	}
 	return 0;
 }
diff --git a/src/box/bootstrap.snap b/src/box/bootstrap.snap
index 018670d2a6558e64ba691ce0002320b76ac1c97b..610e513fdefaa8a3bd60e95a45219c3bee426b71 100644
GIT binary patch
delta 4898
zcmV+-6W#2aCbK4x6@M`>E-)=(WprUOEi*PPXJchFFf}zWIW`JOZgX^DZewLSATcv#
zWHVtoH7#Z`VKOZ=H#TA|IW%T7Eo5XgHZ@~mHDx(BWC~V8Y;R+0Iv{&}3JTS_3%bn`
zvjEQ5vG$6k0000004TLD{Qy{1l>o}86eUIwh*;YI000000Do{W@Qe>$sJGs4T?!dY
zzDXmH-vPG~d#7ZSnIt=M<>tzn&iE!LK>3ftTy=&BI`y~-#(MoXklg2mI+0ESUIGmQ
znan4@m3rSw^SqU}l1Zm!Hq)|XO3Sn)b6JjMSz@v*S8*)M5SOJ*EY}-i?12qq@M7G7
z2elptAg#AS2Y*ht%sJh%rvV3Ld1iyMG^0UTt{I$3Nes-S<OLa|WJ;F7peASO0-78$
zWpqhdz&^?uvowxk$c&=}W5%@tF}fGV0-eVm1r_K#=6XtDy1-EIMJbPQX1U7?m^^m*
zlE(?W<S~ycmoDSDWrA_k!et!SEL;A^qUHajEdQ@puz%F<NRSl474i1(F`T=3{#m^J
zgP_NEtVjSs1Q<VH1>-6ZaE$Z-$46B-=9mf+#ts}|JcSCzs}u)pzE_)K^F5jrn-_C7
z-<#EHcVsl$od#x0NwcM-J5gk{E+`Gqx)_LJ@(@hHEm}Y<a+#dl@a827g1GAmlq)X4
zz!i<2Tz~P3MGM>%FuL6mD7xJcP{1FMfcFL<;C%rH+;EK<aKn|TfE!xfaCH)^aSajC
zxSBZO>3++?#bn46(8T)zn0PNlzz!N<Zvlk%4j>5ImI#1tKmlMIwkcVH!zBU2;gBf7
z;eL{Me*}s5Mt;06a-?@6MtUDoq<0}i&J%zk=YJVQ$a&rs2mrMoLB!2N?6YV-fQ;8_
z=E3oTYmXndes{L*e0-d_qr4n~TCg5IuvgTr-~2oNSmh@-?%!iUy(yE$OU`1Ehk2(w
zTNMNH@qxXf(!4P<CMk>;EIW%g-&e1y7?2O_6=j9MHqX9emB+^Vz+O>%>`%<#(>-@n
zv40=?dg=L5BU7`*8B0%7Bqh-U_EHlo9(!uNJMy6Q?7-tX<*rk&jyt$YhaFs<qYkce
zRSr6`8pj-2g+q?4X7!CYdg=xoJ#FKSp7xXtH_~*CHqulLHqteXHELXFs6pd8BMo0_
zMW$S4pwWLBXY@~o8U6P^Mj8K$LB{`KjDPX}3WgZ}e-Xz2s~2EctzCRkelENy{}x@8
zzx-No@qZRu{Evke|F2hMalaK<++W2N_uo&26}383R6(mF1r@57BjU635qA(&eVr(#
z@E!^&yniAJzd8X$jjM?#Xk1J<!TVPU?4E0hCc0OGiSG3*(S7$nC7QfXPm@pL*MH=j
zJQ6<dk4VCMBaraEh$Flw!U*q&D8jEo5K*&}X9$`dJVJz7^l*BZMG(y1P7ryr2Ley_
zKLn9|_C5fCeGfli&w~%xW4}WW*WcmcdOJH@e|^^--s{-molY0;yw9nlQ61=LG>17F
z)hL$^7sVlmi^}1;=tbk;(0NZz=YMOQ&O3+GJNednaxFL~KY7*`mKHC|=XcKcc34}-
z+xu#_cTQ*9R#&!tL(aBs`;FDN;fDWfc=JCEHuM>4=v@XHdXHfS?M4QzVTeKNHg3$b
z3I?h+=4qa6s#J9il{&>nRi)Pa|H9Pjf|RyeTX<n>7F^hRg%-BnS_KwTSbu@mLqP?f
zsdJ|8osdG!1QdMcTEYoFb0xuq+6cZ9Xi+5ixS8vz8NpYBit?Fhd}@5A2v9!LTti6X
z+a?GiY;&7vLeS<1AZQbZMw{BaePEhTIMZC6Zl;yFs!x|Gb-GMny-b(Mr#|=ZbCdVk
zXS9=<H<5XxWFcm<@hyz`-+#B0JT-R|)Y+p1s5<tk3hwL-f%2!9s$-vmIx9PXO}Nuh
z1$WlSfR4Yu;P+R<`4{()htHFTJdQqLp4K-6wNAH)pSVrj*MmO83G?@P=d<IFMbKwC
z9*5xtb(4?tFNr%z-Yma+`f=y^tikj4i)w**=b!Qp=O%L76?s!~Uw@62e(aUJt_XKJ
zf;wANK$QyeY%JC<s>`{mCoJv^?ko|J8x_>DLr(q`!Jch0v@h12+jMxtWq3o~@ArG%
z)7|;Zbxh*FGY`ezU%A;hZNeVRvP_z^Q9+&+V)F0V`M4vm9(8}GJOo9J^SHG$x$&+I
zxU)e7IN_}No9C(EmVaF`0D8}y|0ut@A=f+f_B-BATxV6adHz>x9)emHNGK21wySlz
zL7%s9s;{cM?VF3bVKYbW+`r$cnM(6|`X@eh_rzuBL7x4wzuWuOVU4C=x4q7Zc$3-h
z6bqxbe<fxRhZpr&_`Lf{%^E9a5KVa}v7la9^U@5jwq@auHGdXC5%-QqEQ0#BvkKC&
z*8bYWf&>5xK*^G{#3cbjqU0wDHhKU42$I?yBR{k`MULd#&gdK>MrvI{iqN`*5b4oo
zHJXj=0$@muP7on9Aogc}B`(e_r;ynO()kgj#Q6YH;(GijaV5^jM-66w_D8e!=X1Ng
z?9cvcdwf+fAb<OFa$56v*`HG{GB2MOEXw|ToV=p!3(Ed{ERdns2u{%053cy&U|()X
zPctVYHI@#2A#ay-7?AzxnvzfpvsAGo``fc;j~=^&Uf6f6=AB}lJzy_O6+6-$qp8<f
zCuW#6iu;!JVZxgCt1xR@ita3oDkihPolo)Vtfh*{N`EWlSfV$tlfZBZpNKhwoo->i
z3<cwa)l%hQc1sThy=9mM)v-^nzQ-x4voETcj8qasCO0J|ZcJoUWF%g$i+;!6Um_wS
zEb_2O3KdCZlyWv{q#~<QlTxKNBa$+MGLu1}(PXq7-lRz?qm+}%g9i`Z{Dqy4SZ|Wb
zDCJ&!+<!Sr$3Be`<%!#LuvW?IidYdwBf`d<{o%LiYef&HL=ek%u*AWa4Qv+;x}33B
z%v{L;2?L}HkjhN$U`I=@BQicyYmUau#LO(1RUiu^qd=x)1jgu#l#!RBE=3ba6>*7e
znTly4$}&ZSWhiOr%8&%5zh#pMm{?%~0h0%Ws(%O}2_3;;K~(IN6doJoK+u#WQkK-L
zr2)-eQ&M?w+LU*?gTlD~tJjbHRmF&8`{wf8`t2M&@y~*S;V`Qmd!8qE{vUgA^XxtL
zAPyEy13auXTlPoMw!1OtvzExitM7AFA=tAT%9iG#))moo^Q`dSQ@6Eozi5`Ia~Zn%
zJ67NR{pKNPj$qG<3|#a|*YU<A=rf!xsyKG(iEax28}77!Fw1^fU=2y;{y{9;#VEa+
z`aabmh-I%7skx&_3XoW5HFq?tUwy-z^#g{ns1t;TnnE)(lg9}yf2~g!&$l0$RS5h+
zvlS;|r~LtuJx{m4A(``(HtE2$^ph*!pYxm&<?veQI}Sls)J5QB*AVk#l~~Ofka$yt
zaXG@0yAG_0g+xgkSOlccewoHc-6CV#kwE%R%#8#p|L&@#>wsgnoI6gaK8^T<Mk$NK
z#k}BONeyX+#v-1He?Y0n5u+VeXOz857{-V%wg~rNiASV-R|ztkZmG8HHrG!qND`Qw
zTjhqzukCLSH2@qEjy{a|KO7DWD-E79`z4wlj#(A<1FTHbEsKb#m#3iOIaTN~Wlua;
z%&%x*3(}U2O=cYgGZ=OUUh21Hjqi5z$gKs$_b_zf!Er`gf3#D*9&$Cu$*vw^H`irV
z4{=YM$BXwWl{>`S{*9k{@SO@0w@E>$Rp<Y=dD;3=iylP1@*HuMYH!_yA&*2mtH?T5
zxj4*jj?ccpA-t^_e0~w}A#W%^pFhZ;CdOx159_m9`~wtC9-1mQS9I^S&jQ9P?EegD
z5&3#A8>5GKf1o1!Q;jbvZpN7*ke{I|orneMPM^`#G6KK;{UK)7tyOFC>()1iNS|I#
zrSVy@cCKLDs909omA}_BdDG>GEf8?l5~^cqX~kZWZ>iS?JtA;^7S1emHl-HP0w|QL
z8YCaZ*Xs&NnrQsaW000Jr!Kq;=Da!bsJgR}54f1Oe~mI}I-O&9mthywH&^Oi^AFQT
zrSuXn-w8LdyREMLpumn|nh78V3oD=9btMeQ5W+~N2N75AAA-h!%%U;WWlMq~Ja$2_
z^{h0PHcb$}fi@|3KL>)r3KEUzuN{+Ui+9?jTN&LcZ$1z?%M3q^GxrTk>G9YRUvN5t
z*E|4ae{N}mT@;|68daE9KXBpZceWmfP(Z1=(JFE^H=h0(aNmWg7<4VnUztE;7Fzin
z_2TI%XO`s{h$}Nu032}0%~9M=X*CP-;dX9&3wan9C)|#wv+VGYLpLePwZvDj&AB7r
z-^|u&YoO4%D9R*#oNUHnw~IZuiX;Nv6z))ve_gH2qn1!w*8(Xw;MiIdA429K=Q%Xn
z$bXtcVlf6qDC2cE2_~&}in!ZAP6ja8Jo|kyiQpvDh#<CZ=iD=MhZzw&Q*@Reb4n+_
zWvwJcs6xnYFoR!Ub=XZnvELdKgSSdce_t%fQ9spF;)X5-$?g8!p}JJm^7NV6R1~X3
zf5ex0Q<jS6&>?`-q8nV#;_u8oO8|>Qjt9z9j$ShJ9pbcdW?^;y{vmq_?LT|)5Id+~
zegd3Qo&s1sPM12EM{J<1C-{vWMdrzH!QU2O)OOR09zb4=fh#Zc8bg}#ZF0Y7JIF97
zN#o{pGrp}RUf6TG8?;o?fZxs)o?!qie>BUbAHUbQeA1Vq?yK88+q*?+xwhd9nXQ8{
zf;!ICfOe>$+&M)9qrKEI#OY(0hB?`b9t!<L>DuoE<yt#|L|1g!0p&*PfKV*@tv019
zBoXMQW}8TAfJhZDB4`0j%Hb>mX818>SnOGVAT`erbXdAU3{A2al$hB^#E@gle`Kf>
z8xS=729m!KW@4p47sQ2mzepSAQAJsM*2R#_5XMCuLtFr`;o$Z_76B6eL>Vme76BAJ
zBO}R_nfU2ivts?7f#gEk0$n-HNouhq6zP;8xUndL8l7grNMu73qC_THY{^@4g4T0{
z%#+SgLkJPU2M@#e$oYeJ8p3M(fBy6S6r$rlzw2hs+sVsC^Aj8i?F~@^lPp$?_(*ZW
z+FnH@7;b>$sY%GZQB(tuXgnu>mx$&U^#|fg7O>T5SVKC))4w6U@>Uy-LQ^x@!cTzm
z2T>B2jG#x9zlpNQ&<mB&MI*kt_SRAD-#&@9a<~n!9=$B`BqCNm?`i?)f1n=IcrFn7
zk?^!$Rznbl=|`vFhP>Tbd@^dn%v4AI8<BBHXEr0@8o>xg@eR4jvu7~Wg3Tz8{x>4=
zl8$Xg!WzODmhug8i?`cg)Fhix7X5BS!Vz6;hQu_25smU2a#wHX!Keu{Qyu+{5SdWw
ze=7-SK}6C5kV9Uzov~@=f9X5obrcb~9s<*Sf^g$f7-~4m1d+kbBl2|g2#pRx*GU1>
z8`|JgB7t=YIIiIsFBKDBKtQ&V3?fb$BY0@;ib;YN2kLJPh+JP&fYmbxV`I<2X-l8f
zpb)J?jwE@~0csFQBJjutQ|IIqz99;2)<mQ_fMMm{{ZAe#E~Xd^e}Kg&Our5U_<9c?
zz^7cb8pd&ns*=7Rp&C1pY!mVZQqqScBhilsu3T7GB9_Z3G!#tuUF@#vW31QMzWI3h
zUPo0J#7ubPd-TI@t{HFfe3%G*b7*XD1y=)`=)jZE-6=A}+r1=OPpNSLR5ZjRA!S4=
z?}oxE082)GER2Lee~-1zx(0sH7?(ns5tQ5;L{ov6jQOxI5&|7}Tk{%>MI#;wAtO+8
zZ4ga?kjm#F<qKI?Wa|9yjz38GK<jG(*O>^a;sVtYjcS$Sgvfu5cCs1OAP;qD%(5N(
zWM#B`<9p8+Q9RE!XVCnn7!B(HFv5JzMrkU5LyI05GBMaGf4!=l|8SW}Ea)GWbvMx*
z<A2{DL%6{(mPS>5@`5~26mg}$ERZpVi5H2P%7P+SnMRDI_#XkTJ#g;vE_}N@Bj>Gf
zlE+5$il}T_;Sq0i4DE5iGJ=+8Ba=r*@Qu4TizJUX?6-7&yA2El3kjqSuYnvE-_}~*
z4YQYh@3Bo`e@)|HeS9*?RLu@$<*4&$vvvn~iXgbG;94&REYY0@K?_a>{Z-v;H;srM
zb)%9$1wXrtoWyHY0a6QR)N#H{JVZQ51cj2j1wqfk(XIeZ4+bF$?~2@Wkcy5)KVXy^
zXpbn|Y?%KHB>X^Tor1stc8oXpGuLF0j-VR9_AVF+fAMKJ_)o=>2Rp12mP_&n+VX8s
z6fGR$1L<T?#ul7~a*|V&5xB&<8XZd4NyYR*LT9&B$gWK=4ffKpKN0GY{<bNYJZrfe
z+>q<55rL3i*+i?aGUOp5AsJ22=!)@H@281qmxp3^S0u~S#sb9>x=u_|6c?zM>N-(L
zvALjLK&tA5OUmYgGD|Yg*5OOt7lWiMhzY|&6w6#wT$SsyBxi2n$SgAJi%<dFh9RmK
Ubb$k&3#3*SwXFTn1=SF(?d={gR{#J2

delta 4874
zcmV+l6ZPz~CYvUZ6@NJ{FfB1THZ5l|F*!0fWHe$k3Q2BrbYX5|WjY`-GBq$^H!@}|
zHZU?}Ei^D=VJ$f@VPY*gWHMo9VPrUAH)CcBRzqxWV{1AfdwmKD)w&D1%@Uyi&Uqw`
z!=(TK0000ewJ-euSjCh8`lje4Mi7YEIRF3v0000m7<#~FH-BI(`S!d06G*-N0Khip
z?|&&%Qj;mk$d;s(N#r!-*sXk<J8^35oMwI0g#2r@zkhdk?`YEwO#)m33IZj+je6fk
zvb>GAu}Pm}CeLwfI>&Pq^Eig%IAU=eQ!pII509e_9Mk(@?12qqdok|7gBp(mOXF?O
z!P6aaPj}>Lz<<FwqS;^^&uB1?X$ELc5rZ?QctHlIc#38)sL?5N0gVnBGr6TKwuf}a
zlnFyIWWvybF=1MP7|jb~fzBb1f(mpFaXh9lUSKHr!j!`~v+U&sOb)wz$>9WEa+t%E
zOP4U*GC>$>;Sz>vmM#Bd(enRNmjBZ$SZaABSc>2Zcz^r%SkB!%{|w&#t@SvM6$v1S
z0OJR&U|a<Pj+GwZIH?N996O+daRW&hQ*i*}bV>s@&#O_fc^++w&4W0b=gn%hJV-QJ
z9tLJ*mS$y^@<>6dc|~D>=Cyznl7IHJTeMgVc9ERh@aCgwE%t&E<$?+<aKR%e7rda6
z0=EQ?ZhtofjBd9B67U8f;C%rHcu#-<_e*63+%GL6;C@&4OP|PUT|`B+E+<cRy5F*N
zH6ii?IPqS9Cf*MXu!RKJJAk3R0T9AACIMhu5CGVQZA+L$MJWl2ib}F1Dhf!6_ehd>
ze+0;TBS?BLa-?@6MtUDoq%4IHDa#;3%JQZ_gn!h21OYeyu-Bsb021D-nYZD!X^$Vb
zes{L%eteX<o4fqktyqtr7>v!W-~2oNS>qQs?%!ug#W9n`3$9|2e|ev~Y9$N$`H8{U
zsEKoC%u*OHSacO{zOi6dvY?+Bj7=31+dTV@(Vd(66N9noQ+r^xP`A`k$$%;h#s_H4
z3V&{F@x&G^Q!FLXBL-U&dC+)v;BlXD_lZ}>9UP^@4vx-I2S+(72OSxWV~&i%AxB0t
z`bHf6bOVllw(&+k`^knI=eb53=cxu8XPU+uwJtQ&pmm**#@Bfzo?T|3(SI3d^iPHv
z{r5jc8UKqx#{XfA@qY@282^6}#{a1oV1HbtU3@WqF1#537F~>g{917Fe->N(kA)Wh
zr&nZgzZF>AU&R&o-%o`VwL4Q(LAxUb6{x2J;<ED)cWY69CyFV&he8VPpNPVzPC!xX
zYT^l67ZXnG{$;Y=cP-IG_ewC)J)b4I@4lymlmFu5<df(*c_)v=m-j~`;k^+^cz<8S
z5#AGFg!e-f;Zq@qsNvZ&1PzZKA-)WHIsMC^wb|ndVo&x!=*j+vAhOTi2OzNT;Roz_
z@Bw@5cj)2zJUm>FXNT*r_sYZj9X!0-@#3BLI(M|GgB@+=P)D2E?9$<yIOK3mI$hVi
z<{TV4=h5jrozr>ea(E@@N=v>ar+?%n*W$|B+G&0Hjq^NR*3Rwr#?tML%h~o7mTljF
zvu)dsbM<Yw;r|-q{7-`oeTEu(mw|@fW0*n1kwI%1V$k{x9I~{6q3R7;lBF9f6m3(5
zQn^9VsW$(=K$W^MrLWQ!Uf7xi7q(uZg{`+%frS)Spz%;p;fvHbQu9top?_uq3SVSe
z!U<nwN`eWs5xz>Gu_57$lQ`a*5x#0tY`#b}Uu(Wd5v0CIG7Uk^*Cs&_VUwFo6M`l`
z06~*cX*8+H+Xo~G6-AQi(~Z0$Qxxhpolm#vqqpfc`P1kAeRlFb`)qbGb0;xxQ_Nxx
zf8Q?d(A-U%t4Rn{b?i^n>VN9&fbpNVs$+lJTtza#&9~1GwYqv#K*wKS`~B5&{>A;<
z;j-l4j+;<;r}h1;*5?K>6t{u<dh4^CFMpqRE<64hv_8}E`1`KaPJYh6BJQGiwfyeq
z$9?Csw&m^D+_HG*fATKp267vgcu#R(oppZfm%y(0_8Hn-H6p+YwST)Rll5!nGR_JL
zgF9PYEh>V8+O10T<X_P8suQApvF6;SzZ)LE`{jPW-{+w2&Lys65&xZcCjK4^&c>$`
z_ExJlaq<SWyE@e5-?H;iCte-q{yzC<L5*|BwX?Xf?v2$|p#h$7M*Yq2P|H=B2w>h4
z=fBCZ?#H#xy#0>%6MxTLS#6&G)tY~Hs|F42!P<7UKDYIm3!nO;x!b-tHaAS-#GU*1
zx;0d4U{JrrW$u=E{Jh;&pxV2=-yGKF`E%QA9Ei7={XVm>dHYykw)nfI&%)*1SZmr;
zFkAEFUCfe#Va-SKyV|COKh_wuAnqN97_|DfGivSBT6=F~LVp1S0kDM0IYuc7O0on<
z2_|{}9!Zi~og+ZBI!2J>+s<a4B1dXoM2yh9h7{q^W;L2k^b$g(W+%uH8j#wny)kOe
z9jS=fX6^h4GDi6TGDh|IF-DD1KR#-)+N(WUt-YVy4OV-#SL^d-B@3#(r>8UzSnWNf
zc=W09?g$koReyUw6$lY*gePh02UmWgVqb1bP_w5aRCW^mAzzvFSWxZdoR-WQt5ve3
z+UwJ&51+b)VAywz=6z<BJz}t0B}>vBo2Sp!C*+s4i91*QX~LR!tS@WZY3gdMN`_Q>
zJD1?i)mkM(8m)-oXx_YD0>kBlA_fh9x`p{jCYUFTj(;c*vpf1I<{dvRsgC`L^*uht
zT!mH1kVGaiWN=hd;KoHqMn~Y~+UR%e{Vggw!XW>QrckkDh6z`f!Wxw-RAyCbv!WVO
zlq94mGa3>thd64M%rN1m_UO@rH-BNDVOE?aGfX%b4|kT*u|K6qf#NnDYFz@mLS}^1
zfUyu)|9|&w`c}{bD$r)t4$5%wWrNy9gDz<5l`~f|P{Kgz0wpt4x>!;fSQ42Zsy0bu
zW@2U*%ql<%gG2!m(-D}XFH=WejJg<2FlEF=wuMTjr6>zk5f-8)qANrajQ);JB4}cz
z2?R|Z7^*U)G?WyJ2{}-yru0;y285_EnZl%2D}OC$2BVtVgHNZtPn{IT{bRv^YR^hm
zgtRYi&#m9i%@ThtsTlq;+Ogkxap(WBx0`40v9~zXGz{#r+Hln`MceMi*5@sde^}q+
ztUk-DItrKO-_`}qb91fl-!jLwalfV(sPp*2`8(hK{pOz~L(8k+0@uXKb-b}?eU`&j
zHASbEfs&)b|9<=I-)hw_3bYZ$+`rALU5-vHs_#$z*{piS2+iG8N<hRqqq&<}{p$N=
zt^eUB8|T892Zpbx6O@OVLNhXx*a<Cv-Vctf!hI0T|N4SGPqRLuv*+pdHza!oGnhpB
zG0evrd{9bdULXG*H;gOl7S!3D(BwKw3^OB?+f*T3j$+xcPOOcEgsdBQ1eDBvnbJqy
z;$z#9K>ALsjRY$H-m0bRfMT|sJ4UEJt@uPwDT~9!ywG1^4Jn3(BAki9k;f5#BOg|0
zpuI~7_J}XGIQL+QU!-hT4HB7dsg~?E$ImTD5ty7?<%Y`B_J0pF02~sJK8*N(4hIe^
z44yLjC7vFRSrzsJtUS}Li-4$?r=apVRqirnCZ1HRuXtn&+LndQW*r1G2>Sys@!PVG
z@4kH|tqH{U8M=_*II}I<QMnv{b~ndmR}Zr`(`8o=yQanCMSE41JH*@m?Voz_P6esk
zq9D{t=lQpJ*}74a9z<Pvj<`y-w{F3ZN1~loa2->*9AY)YXJ6qk-l`cczmNEkHx#7L
zA7s!IaI-6ibXjfw0g5IK4V9TIy7$_r0mBvCe}=TUe7%>c(L-cVk@=~AM!OU@<H!)m
z&(M|5!~(6S&uD5G;a~qg7BlP4sx|p_>zhNQPtQ_myjQFpD;Vc0mX&tp-jSKCsq(`H
z2smpA^RcwFA}Hy%+-rmT5jZakM;AJiQj3@Y6q>6VFdv1$>mnpoH1;lGkd`B-E|Lr8
zygBlyy1$W+xtM2;GG97>o#T0zkrz}qR_bH(4+BTFv=Xo237@gMt=#;e@Q%Wp37`iH
zi=W-~Bn-$H!pNrw$ye_mX2yWbvN6<MOM)Occ0sW9tTZoedJunzHYImIhgrf3DjLyW
zD<;tv-?T}$I(k!HgCKJD8Ge4z{5LJ7`(sBu!s!TX_CS=?rHw{^QGj}CRAJfz!A0cX
z*>)UF0i_K{+sf74c=~6+Js759&^3a8C2J9O4%@Nmx{l_^vM8IvJhTme<l47xSXxbi
zdKfwn-Zq=ZjS8pZ)loNW$e~Lz?b_21u-Ta->20QJv_YWgToh#3KJGQ^u<Lr;Q>BPN
zX9Q0mnC@2EkxLMNm1>!M8jxdyiO=~kh-Vy{9pp33fn_m<Hk8r2YX_57D@ELG5GMl|
zY@EHem_&1uuZSSFZs*)HbAuTXIa6?kA9D^TpJn+-h|pHZY|w&Vc<Qj5fMUNjCdO|S
zi2lA<kfVO8r^F3i3XI$Rc|&!nsO9N1v)L+ENs%w}rYseI&7ng8sYN&bo+aFwlal}z
zpA-+2ryM<H<vYY_<<!FI{QX1zlG^?E;30N!q5K3nrF;rt^$1vWFpt<kIZyl>JBmz`
z<KkZ%fLGgL7absZGzL~)=xq#H#`nqj{>>o4prnnP^UL@&ns8y?*=|5xNdkU9_dLT4
zSZIbzKYp)&arvxYih8JSqip9kC8x@kGh~|%RttKZs)6mCf^y^(4UP6vFA=YgVH$*F
zFL@~R8>Me=5|qpB1Tr1b!3PI7x`RSt*-y1CO(Bm!H#OTr(gQ>iyok^R@Dqn~2t31=
z3BxAGilVN2iXdUA1~C-PLXgBp9}t@yYbHaN$VAhB_#c4y#h8ed0$m&z;{7UXm}eDb
z?bsIcWJDMjF${45!iMA98#x3>_!VKW%v&W`^{k8}IVGZ}Yt4`KW(JZAw*?yEG$*Oe
zl5n6?hM>ow3TAYfM6t+*MhKFbli1R?<V2lk2t6;AA%-SI2u(gr<0Iw|{xk&D_Wa|W
zImE|*e}31^oHrsb7ZXo#Bs3>QNzAE8E#o7_gtWaCkzkntjHf0b^G0zEJfrcP{6ZpH
zUc_IBE=kB1BO?Xnrce8dIO41}7!}P%6pKHR<u9g5R5M16EPp1-;-MERM;D3s>bhHZ
zr9b;5+s0vfz;g7m#z{o1ecsgq&_OAt@n0Z+^dQYsy)1^zG^&Y)iYxM{v-)6Elo=@<
zy*D89A+6brMk|aLoaP(yv189{C<PmlHok8{;wM_R8i5qFJuLkT;x=!!!Kg_xqAdR0
zh=e2BSPY426em0N7v!$a%9GJaV$v%9OwgH7>VK<<X+=cR1t5o>YCC7s#L{-e=_nC@
zGA%@|`2?Zor830Ogb1O@n})<`=mEM8Vyl$~hBGwb(=vn0QgB?u89tpeyo7`-l8gdQ
z8By@7|B6Y1tqzr2IsjZ>a|p|4PR0hF!D$Ph)SwWlL58H`PyuFenMAP2CR6uh6uKb}
zZP!2~I+S6B-u+J{Qd~wcn2_ZsjBb5@1n_zfKfq2oYBY@D5>zGqK0-BgV%cWojakx%
zBy&+84^+9dRz@tBLue>W_*d+%=wqnU*uLp_`CjL$FpPn4?DW>d?ynn9@mv@P`BJE0
zk1DPPnBc^N(A6oj#M`_iT286;f>kucCxS|eB+lmADi{k!{w#=uK##G>x(<4O(HNJ4
znGlrRn@rci7mWFkATk1vXREUs!jjQG2_q+vV{Ot-gRsh{nDPl3S7bK6XO1tVe4+KV
zhO0~jwdRs)iAJ@`aRTJKMl;!tW{`$DG-gSTeX=rIzVV*p4<O!Wo3m(6(~OLD5C~yD
zW|MUtK%qs?3?CTilwMKJfB4LQBp37#%Y>Wg9OJ$3k10CAG?wA2Jb6bRNEETqR~En+
zGsO$!Ok_a`t4t%tQv8bm*B&_cco)8Xp3(ExIL%{Yctun;t#Hc+I)-{UXbC}!vx&*0
zBlyNC&P9^fTlO0|x7`LonTG{Z=l1{(i*H*k^oG&Pzx2o^peAv!);<}3D^;`RSvl$)
z+N|9Hp8`rJBe>qX0Yr4=Nf1R6!F#HU>`g19uTE4lPXW&^At&}4T7cEU8Fk2)gNK9%
ziA3SD?m^JEaI_bo;lUsT;Z2!)4r1A%=ocBa2HFvZpAF-mfdn7OtkV!Uz>XmXf99GD
z(g9TC*UkeYAwCHO@2OjVvuML)!g5XiNL$_xi?W46{E$ipX>6IZL`-6eQUZ6ZtJMMM
zI)RubNZ{;}2HBMfq`_Y~#ybK%(%&`(lcyt>lMA?h6(TZLtD9BTRR%0fBqZ1L8eTIV
zb#5An<asFM?~1cbZB(FGrmM^(rJ|C0sjf3ED4R>l<+_@vq;4uXsk0>WY#qJSeKknR
wfS53>M6uj8!Bx3FOG@Tu8`(wnzX&*h$1oxFf-Z2tbAi;#qL#HEy44V^?FR`@!vFvP

diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 99b65b0f5..8c328852d 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -2597,7 +2597,8 @@ box.schema.func.create = function(name, opts)
     opts = opts or {}
     check_param_table(opts, { setuid = 'boolean',
                               if_not_exists = 'boolean',
-                              language = 'string', body = 'string',
+                              language = 'string', body = 'any',
+                              aggregate = 'string',
                               is_deterministic = 'boolean',
                               is_sandboxed = 'boolean',
                               is_multikey = 'boolean',
diff --git a/src/box/lua/upgrade.lua b/src/box/lua/upgrade.lua
index f9e6543c2..0e2c4a091 100644
--- a/src/box/lua/upgrade.lua
+++ b/src/box/lua/upgrade.lua
@@ -1212,6 +1212,21 @@ local function upgrade_to_2_9_1()
     remove_sql_builtin_functions_from_func()
 end
 
+--------------------------------------------------------------------------------
+-- Tarantool 2.10.1
+--------------------------------------------------------------------------------
+local function change_func_body_field_type()
+    local _func = box.space._func
+    local format = _func:format()
+    -- Change type of 'body' field from 'string' to 'any'.
+    format[6].type = 'any'
+    _func:format(format)
+end
+
+local function upgrade_to_2_10_1()
+    change_func_body_field_type()
+end
+
 --------------------------------------------------------------------------------
 
 local handlers = {
@@ -1229,6 +1244,7 @@ local handlers = {
     {version = mkversion(2, 3, 1), func = upgrade_to_2_3_1, auto = true},
     {version = mkversion(2, 7, 1), func = upgrade_to_2_7_1, auto = true},
     {version = mkversion(2, 9, 1), func = upgrade_to_2_9_1, auto = true},
+    {version = mkversion(2, 10, 1), func = upgrade_to_2_10_1, auto = true},
 }
 
 -- Schema version of the snapshot.
diff --git a/src/box/schema.cc b/src/box/schema.cc
index c248d4ee4..eb1b4a7dd 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -60,6 +60,7 @@ static struct mh_i32ptr_t *spaces;
 static struct mh_strnptr_t *spaces_by_name;
 static struct mh_i32ptr_t *funcs;
 static struct mh_strnptr_t *funcs_by_name;
+static struct mh_i32ptr_t *funcs_fin;
 static struct mh_i32ptr_t *sequences;
 /** Public change counter. On its update clients need to fetch
  *  new space data from the instance. */
@@ -379,6 +380,7 @@ schema_init(void)
 	spaces_by_name = mh_strnptr_new();
 	funcs = mh_i32ptr_new();
 	funcs_by_name = mh_strnptr_new();
+	funcs_fin = mh_i32ptr_new();
 	sequences = mh_i32ptr_new();
 	/*
 	 * Create surrogate space objects for the mandatory system
@@ -540,6 +542,14 @@ schema_free(void)
 		func_delete(func);
 	}
 	mh_i32ptr_delete(funcs);
+	while (mh_size(funcs_fin) > 0) {
+		mh_int_t i = mh_first(funcs_fin);
+		struct func *func =
+			(struct func *)mh_i32ptr_node(funcs_fin, i)->val;
+		func_cache_delete(func->def->fid);
+		func_delete(func);
+	}
+	mh_i32ptr_delete(funcs_fin);
 	while (mh_size(sequences) > 0) {
 		mh_int_t i = mh_first(sequences);
 
@@ -597,6 +607,33 @@ func_by_name(const char *name, uint32_t name_len)
 	return (struct func *) mh_strnptr_node(funcs_by_name, func)->val;
 }
 
+void
+func_fin_cache_insert(struct func *func)
+{
+	uint32_t fid = func->def->fid;
+	assert(func_fin_by_id(fid) == NULL);
+	const struct mh_i32ptr_node_t node = {fid, func};
+	mh_i32ptr_put(funcs_fin, &node, NULL, NULL);
+}
+
+void
+func_fin_cache_delete(uint32_t fid)
+{
+	mh_int_t k = mh_i32ptr_find(funcs_fin, fid, NULL);
+	if (k == mh_end(funcs_fin))
+		return;
+	mh_i32ptr_del(funcs_fin, k, NULL);
+}
+
+struct func *
+func_fin_by_id(uint32_t fid)
+{
+	mh_int_t func = mh_i32ptr_find(funcs_fin, fid, NULL);
+	if (func == mh_end(funcs_fin))
+		return NULL;
+	return (struct func *)mh_i32ptr_node(funcs_fin, func)->val;
+}
+
 int
 schema_find_grants(const char *type, uint32_t id, bool *out)
 {
diff --git a/src/box/schema.h b/src/box/schema.h
index d3bbdd590..2f020ee96 100644
--- a/src/box/schema.h
+++ b/src/box/schema.h
@@ -102,6 +102,15 @@ func_by_id(uint32_t fid);
 struct func *
 func_by_name(const char *name, uint32_t name_len);
 
+void
+func_fin_cache_insert(struct func *func);
+
+void
+func_fin_cache_delete(uint32_t fid);
+
+struct func *
+func_fin_by_id(uint32_t fid);
+
 /** Call a visitor function on every space in the space cache. */
 int
 space_foreach(int (*func)(struct space *sp, void *udata), void *udata);
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index eb169aeb8..46ffe377c 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -5469,7 +5469,18 @@ analyzeAggregate(Walker * pWalker, Expr * pExpr)
 						       (pExpr, EP_xIsSelect));
 						pItem = &pAggInfo->aFunc[i];
 						pItem->pExpr = pExpr;
-						pItem->iMem = ++pParse->nMem;
+						int n = pExpr->x.pList == NULL ?
+							0 : pExpr->x.pList->nExpr;
+						/*
+						 * Allocate 1 MEM for
+						 * accumulator and next n MEMs
+						 * for arguments. This makes it
+						 * easier to pass these n + 1
+						 * MEMs to the user-defined
+						 * aggregate function.
+						 */
+						pItem->iMem = pParse->nMem + 1;
+						pParse->nMem += n + 1;
 						assert(!ExprHasProperty
 						       (pExpr, EP_IntValue));
 						pItem->func =
@@ -5479,12 +5490,6 @@ analyzeAggregate(Walker * pWalker, Expr * pExpr)
 								true;
 							return WRC_Abort;
 						}
-						assert(pItem->func->def->
-						       language ==
-						       FUNC_LANGUAGE_SQL_BUILTIN &&
-						       pItem->func->def->
-						       aggregate ==
-						       FUNC_AGGREGATE_GROUP);
 						if (pExpr->flags & EP_Distinct) {
 							pItem->iDistinct =
 								pParse->nTab++;
diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index b69bf7fd6..493b69a4c 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -2072,9 +2072,12 @@ uint32_t
 sql_func_flags(const char *name)
 {
 	struct sql_func_dictionary *dict = built_in_func_get(name);
-	if (dict == NULL)
+	if (dict != NULL)
+		return dict->flags;
+	struct func *func = func_by_name(name, strlen(name));
+	if (func == NULL || func->def->aggregate != FUNC_AGGREGATE_GROUP)
 		return 0;
-	return dict->flags;
+	return SQL_FUNC_AGG;
 }
 
 static struct func_vtab func_sql_builtin_vtab;
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 2e33a31d2..c77cd1c9f 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -4648,8 +4648,6 @@ is_simple_count(struct Select *select, struct AggInfo *agg_info)
 		return NULL;
 	if (NEVER(agg_info->nFunc == 0))
 		return NULL;
-	assert(agg_info->aFunc->func->def->language ==
-	       FUNC_LANGUAGE_SQL_BUILTIN);
 	if (memcmp(agg_info->aFunc->func->def->name, "COUNT", 5) != 0 ||
 	    (agg_info->aFunc->pExpr->x.pList != NULL &&
 	     agg_info->aFunc->pExpr->x.pList->nExpr > 0))
@@ -5575,9 +5573,17 @@ finalizeAggFunctions(Parse * pParse, AggInfo * pAggInfo)
 	for (i = 0, pF = pAggInfo->aFunc; i < pAggInfo->nFunc; i++, pF++) {
 		ExprList *pList = pF->pExpr->x.pList;
 		assert(!ExprHasProperty(pF->pExpr, EP_xIsSelect));
-		sqlVdbeAddOp2(v, OP_AggFinal, pF->iMem,
-				  pList ? pList->nExpr : 0);
-		sqlVdbeAppendP4(v, pF->func, P4_FUNC);
+		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
+			sqlVdbeAddOp2(v, OP_AggFinal, pF->iMem,
+				      pList != NULL ? pList->nExpr : 0);
+			sqlVdbeAppendP4(v, pF->func, P4_FUNC);
+		} else {
+			sqlVdbeAddOp1(v, OP_UserAggFinal, pF->iMem);
+			const char *name = pF->func->def->name;
+			int len = pF->func->def->name_len;
+			char *str = sqlDbStrNDup(pParse->db, name, len);
+			sqlVdbeAppendP4(v, str, P4_DYNAMIC);
+		}
 	}
 }
 
@@ -5604,7 +5610,7 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
 		assert(!ExprHasProperty(pF->pExpr, EP_xIsSelect));
 		if (pList) {
 			nArg = pList->nExpr;
-			regAgg = sqlGetTempRange(pParse, nArg);
+			regAgg = pF->iMem + 1;
 			sqlExprCodeExprList(pParse, pList, regAgg, 0,
 						SQL_ECEL_DUP);
 		} else {
@@ -5644,10 +5650,17 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
 			pParse->is_aborted = true;
 			return;
 		}
-		sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
-		sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
-		sql_expr_type_cache_change(pParse, regAgg, nArg);
-		sqlReleaseTempRange(pParse, regAgg, nArg);
+		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
+			sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
+			sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
+		} else {
+			sqlVdbeAddOp3(v, OP_UserAggStep, nArg, regAgg,
+				      pF->iMem);
+			const char *name = pF->func->def->name;
+			int len = pF->func->def->name_len;
+			char *str = sqlDbStrNDup(pParse->db, name, len);
+			sqlVdbeAppendP4(v, str, P4_DYNAMIC);
+		}
 		if (addrNext) {
 			sqlVdbeResolveLabel(v, addrNext);
 			sqlExprCacheClear(pParse);
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 24cb28260..873b9f9f4 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1289,6 +1289,93 @@ case OP_FunctionByName: {
 	break;
 }
 
+/*
+ * Opcode: UserAggStep P1 P2 P3 P4 *
+ * Synopsis: accum=r[P3] step(r[P2@P1])
+ *
+ * Execute the step function for a user-defined aggregate function. The step
+ * function has P1 arguments. P4 is a pointer to a function object that defines
+ * the aggregate function. Register P3 is the accumulator.
+ *
+ * The P1 arguments are taken from register P2 and its successors. The P2
+ * register must be immediately after the P3 register.
+ */
+case OP_UserAggStep: {
+	assert(pOp->p4type == P4_DYNAMIC);
+	assert(pOp->p1 == 0 || pOp->p3 == pOp->p2 - 1);
+	struct func *func = func_by_name(pOp->p4.z, strlen(pOp->p4.z));
+	if (func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+
+	struct Mem *argv = &aMem[pOp->p3];
+	struct port args;
+	struct port ret;
+
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	port_vdbemem_create(&args, (struct sql_value *)argv, pOp->p1 + 1);
+	if (func->vtab->call(func, &args, &ret) != 0) {
+		region_truncate(region, region_svp);
+		goto abort_due_to_error;
+	}
+
+	uint32_t size;
+	struct Mem *mem = (struct Mem *)port_get_vdbemem(&ret, &size);
+	assert(size == 1);
+	bool is_error = mem == NULL || mem_copy(&aMem[pOp->p3], mem) != 0;
+	port_destroy(&ret);
+	region_truncate(region, region_svp);
+	if (is_error)
+		goto abort_due_to_error;
+	break;
+}
+
+/* Opcode: UserAggFinal P1 * * P4 *
+ * Synopsis: r[P1] = finalize(r[P1])
+ *
+ * Execute the finalize function for a user-defined aggregate function. P4 is a
+ * pointer to a function object that defines the aggregate function. Register P1
+ * is the accumulator. The result will be written to register P1.
+ */
+case OP_UserAggFinal: {
+	assert(pOp->p1 > 0 && pOp->p1 <= (p->nMem + 1 - p->nCursor));
+	assert(pOp->p4type == P4_DYNAMIC);
+	struct func *step_func = func_by_name(pOp->p4.z, strlen(pOp->p4.z));
+	if (step_func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+	struct func *func = func_fin_by_id(step_func->def->fid);
+	if (func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+
+	struct Mem *argv = &aMem[pOp->p1];
+	struct port args;
+	struct port ret;
+
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	port_vdbemem_create(&args, (struct sql_value *)argv, 1);
+	if (func->vtab->call(func, &args, &ret) != 0) {
+		region_truncate(region, region_svp);
+		goto abort_due_to_error;
+	}
+
+	uint32_t size;
+	struct Mem *mem = (struct Mem *)port_get_vdbemem(&ret, &size);
+	assert(size == 1);
+	bool is_error = mem == NULL || mem_copy(&aMem[pOp->p1], mem) != 0;
+	port_destroy(&ret);
+	region_truncate(region, region_svp);
+	if (is_error)
+		goto abort_due_to_error;
+	break;
+}
+
 /* Opcode: BitAnd P1 P2 P3 * *
  * Synopsis: r[P3]=r[P1]&r[P2]
  *
diff --git a/test/box-py/bootstrap.result b/test/box-py/bootstrap.result
index cea440c64..78d444bb0 100644
--- a/test/box-py/bootstrap.result
+++ b/test/box-py/bootstrap.result
@@ -4,7 +4,7 @@ box.internal.bootstrap()
 box.space._schema:select{}
 ---
 - - ['max_id', 511]
-  - ['version', 2, 9, 1]
+  - ['version', 2, 10, 1]
 ...
 box.space._cluster:select{}
 ---
@@ -54,7 +54,7 @@ box.space._space:select{}
   - [296, 1, '_func', 'memtx', 0, {}, [{'name': 'id', 'type': 'unsigned'}, {'name': 'owner',
         'type': 'unsigned'}, {'name': 'name', 'type': 'string'}, {'name': 'setuid',
         'type': 'unsigned'}, {'name': 'language', 'type': 'string'}, {'name': 'body',
-        'type': 'string'}, {'name': 'routine_type', 'type': 'string'}, {'name': 'param_list',
+        'type': 'any'}, {'name': 'routine_type', 'type': 'string'}, {'name': 'param_list',
         'type': 'array'}, {'name': 'returns', 'type': 'string'}, {'name': 'aggregate',
         'type': 'string'}, {'name': 'sql_data_access', 'type': 'string'}, {'name': 'is_deterministic',
         'type': 'boolean'}, {'name': 'is_sandboxed', 'type': 'boolean'}, {'name': 'is_null_call',
diff --git a/test/sql-tap/CMakeLists.txt b/test/sql-tap/CMakeLists.txt
index c4ec1214a..136a517d4 100644
--- a/test/sql-tap/CMakeLists.txt
+++ b/test/sql-tap/CMakeLists.txt
@@ -3,6 +3,8 @@ build_module(gh-5938-wrong-string-length gh-5938-wrong-string-length.c)
 target_link_libraries(gh-5938-wrong-string-length msgpuck)
 build_module(gh-6024-funcs-return-bin gh-6024-funcs-return-bin.c)
 target_link_libraries(gh-6024-funcs-return-bin msgpuck)
+build_module(gh-2579-custom-aggregate gh-2579-custom-aggregate.c)
+target_link_libraries(gh-2579-custom-aggregate msgpuck)
 build_module(sql_uuid sql_uuid.c)
 target_link_libraries(sql_uuid msgpuck core)
 build_module(decimal decimal.c)
diff --git a/test/sql-tap/gh-2579-custom-aggregate.c b/test/sql-tap/gh-2579-custom-aggregate.c
new file mode 100644
index 000000000..5552effc5
--- /dev/null
+++ b/test/sql-tap/gh-2579-custom-aggregate.c
@@ -0,0 +1,48 @@
+#include "msgpuck.h"
+#include "module.h"
+#include "mp_decimal.h"
+#include "lua/tnt_msgpuck.h"
+#include "mp_uuid.h"
+
+enum {
+	BUF_SIZE = 512,
+};
+
+int
+S3(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+	(void)args_end;
+	uint32_t arg_count = mp_decode_array(&args);
+	if (arg_count != 2) {
+		return box_error_set(__FILE__, __LINE__, ER_PROC_C,
+				     "invalid argument count");
+	}
+	int sum = 0;
+	if (mp_typeof(*args) != MP_UINT)
+		mp_decode_nil(&args);
+	else
+		sum = mp_decode_uint(&args);
+	sum += mp_decode_uint(&args);
+	char res[BUF_SIZE];
+	char *end = mp_encode_uint(res, sum);
+	box_return_mp(ctx, res, end);
+	return 0;
+}
+
+int
+S3_finalize(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+	(void)args_end;
+	uint32_t arg_count = mp_decode_array(&args);
+	if (arg_count != 1) {
+		return box_error_set(__FILE__, __LINE__, ER_PROC_C,
+				     "invalid argument count");
+	}
+	int sum = 0;
+	if (mp_typeof(*args) == MP_UINT)
+		sum = mp_decode_uint(&args);
+	char res[BUF_SIZE];
+	char *end = mp_encode_double(res, sum / 10.0);
+	box_return_mp(ctx, res, end);
+	return 0;
+}
diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua
new file mode 100755
index 000000000..d5b845761
--- /dev/null
+++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua
@@ -0,0 +1,232 @@
+#!/usr/bin/env tarantool
+local build_path = os.getenv("BUILDDIR")
+package.cpath = build_path..'/test/sql-tap/?.so;'..build_path..'/test/sql-tap/?.dylib;'..package.cpath
+
+local test = require("sqltester")
+test:plan(13)
+
+-- Make sure that non-persistent aggregate functions are working as expected.
+local step = function(x, y)
+    if x == nil then
+        x = {sum = 0, count = 0}
+    end
+    x.sum = x.sum + y
+    x.count = x.count + 1
+    return x
+end
+
+local finalize = function(x)
+    return x.sum / x.count
+end
+
+rawset(_G, 'S1', step)
+rawset(_G, 'S1_finalize', finalize)
+box.schema.func.create('S1', {aggregate = 'group', returns = 'number',
+                              param_list = {'integer'}, exports = {'SQL'}})
+
+test:execsql([[
+    CREATE TABLE t(i INTEGER PRIMARY KEY AUTOINCREMENT, a INT);
+    INSERT INTO t(a) VALUES(1), (1), (2), (2), (3);
+]])
+
+test:do_execsql_test(
+    "gh-2579-1",
+    [[
+        SELECT avg(a), s1(a) FROM t;
+    ]], {
+        1, 1.8
+    })
+
+test:do_execsql_test(
+    "gh-2579-2",
+    [[
+        SELECT avg(distinct a), s1(distinct a) FROM t;
+    ]], {
+        2, 2
+    })
+
+-- Make sure that persistent aggregate functions are working as expected.
+local body_s2 = {
+    step = [[function(x, y)
+        if x == nil then
+            x = {sum = 0, count = 0}
+        end
+        x.sum = x.sum + y
+        x.count = x.count + 1
+        return x
+    end]],
+    finalize = [[function(x)
+        return x.sum / x.count
+    end]]
+}
+
+box.schema.func.create('S2', {aggregate = 'group', returns = 'number',
+                              body = body_s2, param_list = {'integer'},
+                              exports = {'SQL'}})
+
+test:do_execsql_test(
+    "gh-2579-3",
+    [[
+        SELECT avg(a), s2(a) FROM t;
+    ]], {
+        1, 1.8
+    })
+
+test:do_execsql_test(
+    "gh-2579-4",
+    [[
+        SELECT avg(distinct a), s2(distinct a) FROM t;
+    ]], {
+        2, 2
+    })
+
+-- Make sure that C aggregate functions are working as expected.
+box.schema.func.create("gh-2579-custom-aggregate.S3", {
+    language = "C", aggregate = 'group', param_list = {"integer"},
+    returns = "number", exports = {"SQL"},
+})
+
+test:do_execsql_test(
+    "gh-2579-5",
+    [[
+        SELECT "gh-2579-custom-aggregate.S3"(a) FROM t;
+    ]], {
+        0.9
+    })
+
+test:do_execsql_test(
+    "gh-2579-6",
+    [[
+        SELECT "gh-2579-custom-aggregate.S3"(distinct a) FROM t;
+    ]], {
+        0.6
+    })
+
+--
+-- Make sure that user-defined aggregate functions can have more than one
+-- argument.
+--
+local body_s4 = {
+    step = [[function(x, y, z)
+        if x == nil then
+            x = {sum = 0, count = 0}
+        end
+        x.sum = x.sum + y + z
+        x.count = x.count + 1
+        return x
+    end]],
+    finalize = [[function(x)
+        return x.sum / x.count
+    end]]
+}
+box.schema.func.create('S4', {aggregate = 'group', returns = 'number',
+                              param_list = {'integer', 'integer'},
+                              body = body_s4, exports = {'SQL'}})
+
+test:do_execsql_test(
+    "gh-2579-7",
+    [[
+        SELECT s4(a, i) FROM t;
+    ]], {
+        4.8
+    })
+
+--
+-- Make sure that user-defined aggregate functions with more than one argument
+-- cannot work with DISTINCT.
+--
+test:do_catchsql_test(
+    "gh-2579-8",
+    [[
+        SELECT s4(distinct a, i) FROM t;
+    ]], {
+        1, "DISTINCT aggregates must have exactly one argument"
+    })
+
+-- Make sure user-defined aggregate functions are not available in Lua.
+test:do_test(
+    "gh-2579-9",
+    function()
+        local def = {aggregate = 'group', returns = 'number',
+                     param_list = {'integer'}, exports = {'LUA', 'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S5', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S5': aggregate function can only be "..
+        "accessed in SQL"
+    })
+
+--
+-- Make sure user-defined aggregate functions should have both STEP and FINALIZE
+-- persistent or non-persistent.
+--
+test:do_test(
+    "gh-2579-10",
+    function()
+        local body = "function(x) return x / 1000 end"
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S6', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S6': step or finalize of aggregate "..
+        "function is undefined"
+    })
+
+--
+-- Make sure body of user-defined aggregate function should have exactly two
+-- fields: "step" and "finalize".
+--
+test:do_test(
+    "gh-2579-11",
+    function()
+        local body = {step = "function(x) return x / 1000 end"}
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S7', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S7': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+test:do_test(
+    "gh-2579-12",
+    function()
+        local body = {finalize = "function(x) return x / 1000 end"}
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S8', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S8': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+test:do_test(
+    "gh-2579-13",
+    function()
+        local body = {
+            step = [[function(x, y, z)
+                if x == nil then
+                    x = {sum = 0, count = 0}
+                end
+                x.sum = x.sum + y + z
+                x.count = x.count + 1
+                return x
+            end]],
+            finalize = [[function(x)
+                return x.sum / x.count
+            end]],
+            something = 0
+        }
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S9', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S9': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+test:finish_test()
-- 
2.25.1


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v1 1/1] sql: introduce user-defined aggregate functions
  2022-01-14 13:38 [Tarantool-patches] [PATCH v1 1/1] sql: introduce user-defined aggregate functions Mergen Imeev via Tarantool-patches
@ 2022-01-18  0:10 ` Vladislav Shpilevoy via Tarantool-patches
  2022-01-23 14:17   ` Mergen Imeev via Tarantool-patches
  0 siblings, 1 reply; 4+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2022-01-18  0:10 UTC (permalink / raw)
  To: imeevma; +Cc: tarantool-patches

Hi! Thanks for the patch!

What happened to the idea we had about SQL iterators? Then people would
be able to iterate with yields and to make any kind of aggregation without
having to create functions for that.

See 8 comments below.

> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 65c1cb952..f1dfcd807 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -3276,6 +3276,85 @@ func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid)
>  	return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid);
>  }
>  
> +static int
> +func_def_check_body(struct tuple *tuple)
> +{
> +	assert(tuple_field_count(tuple) > BOX_FUNC_FIELD_BODY);
> +	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
> +	assert(field != NULL);
> +	enum mp_type type = mp_typeof(*field);
> +	if (type == MP_STR)
> +		return 0;
> +	if (type != MP_MAP) {
> +		diag_set(ClientError, ER_FIELD_TYPE, "map or array",

1. 'Map or string'.

> +			 mp_type_strs[type]);
> +		return -1;
> +	}
> +	const char *agg = tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE);
> +	if (agg == NULL)
> +		return -1;
> +	if (STR2ENUM(func_aggregate, agg) != FUNC_AGGREGATE_GROUP) {
> +		const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
> +		diag_set(ClientError, ER_CREATE_FUNCTION, name,
> +			 "only aggregate functions can have map as body");
> +		return -1;
> +	}
> +
> +	bool has_step = false;
> +	bool has_finalize = false;
> +	if (mp_decode_map(&field) != 2)
> +		goto error;
> +	for (int i = 0; i < 2; ++i) {
> +		const char *value;
> +		uint32_t size;
> +		if (mp_typeof(*field) != MP_STR)
> +			goto error;
> +		value = mp_decode_str(&field, &size);
> +		if (size == strlen("step")) {
> +			if (strncmp(value, "step", size) != 0)
> +				goto error;

2. I would rather propose to introduce a function to compare
0-terminated string with non-terminated one. It is needed quite
often apparently. For instance,

	strlcmp(const char *l, const char *r, size_t r_len);

> +			has_step = true;
> +		} else if (size == strlen("finalize")) {
> +			if (strncmp(value, "finalize", size) != 0)
> +				goto error;
> +			has_finalize = true;
> +		}
> +		if (mp_typeof(*field) != MP_STR)
> +			goto error;
> +		mp_next(&field);
> +	}
> +	if (has_step && has_finalize)
> +		return 0;
> +error:
> +	const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
> +	diag_set(ClientError, ER_CREATE_FUNCTION, name,
> +		 "body of aggregate function should be map that contains "
> +		 "exactly two string fields: 'step' and 'finalize'");
> +	return -1;
> +}
> +
> +static const char *
> +func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step)

3. Please, lets split that function in 2 maybe. Its invocation with
hardcoded 'true'/'false' in the end gives hard time understanding what
these values mean.

> +{
> +	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
> +	assert(field != NULL);
> +	if (mp_typeof(*field) == MP_STR) {
> +		if (is_step)
> +			return mp_decode_str(&field, body_len);
> +		*body_len = 0;
> +		return field;

4. You return len as 0, but the returned value != NULL? Why?

> +	}
> @@ -3568,6 +3697,16 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
>  		struct func *func = func_new(def);
>  		if (func == NULL)
>  			return -1;
> +		if (def->aggregate == FUNC_AGGREGATE_GROUP) {
> +			struct func_def *fin_def =
> +				func_fin_def_new(new_tuple, def);
> +			if (fin_def == NULL)
> +				return -1;
> +			struct func *func_fin = func_new(fin_def);
> +			if (func_fin == NULL)
> +				return -1;
> +			func_fin_cache_insert(func_fin);

5. What happens if WAL write fails and on_create_func_rollback() is called?

> diff --git a/src/box/schema.h b/src/box/schema.h
> index d3bbdd590..2f020ee96 100644
> --- a/src/box/schema.h
> +++ b/src/box/schema.h
> @@ -102,6 +102,15 @@ func_by_id(uint32_t fid);
>  struct func *
>  func_by_name(const char *name, uint32_t name_len);
>  
> +void
> +func_fin_cache_insert(struct func *func);
> +
> +void
> +func_fin_cache_delete(uint32_t fid);
> +
> +struct func *
> +func_fin_by_id(uint32_t fid);

6. Lets leave some comments about what is 'func fin'.

> diff --git a/src/box/sql/select.c b/src/box/sql/select.c
> index 2e33a31d2..c77cd1c9f 100644
> --- a/src/box/sql/select.c
> +++ b/src/box/sql/select.c
> @@ -5644,10 +5650,17 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
>  			pParse->is_aborted = true;
>  			return;
>  		}
> -		sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
> -		sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
> -		sql_expr_type_cache_change(pParse, regAgg, nArg);
> -		sqlReleaseTempRange(pParse, regAgg, nArg);
> +		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
> +			sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
> +			sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
> +		} else {
> +			sqlVdbeAddOp3(v, OP_UserAggStep, nArg, regAgg,
> +				      pF->iMem);
> +			const char *name = pF->func->def->name;
> +			int len = pF->func->def->name_len;
> +			char *str = sqlDbStrNDup(pParse->db, name, len);
> +			sqlVdbeAppendP4(v, str, P4_DYNAMIC);

7. Was it possible to unify how builtin and user-defined functions
are called? In the same opcode without any special 'if's.

> diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua
> new file mode 100755
> index 000000000..d5b845761
> --- /dev/null
> +++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua
8. It might also be good to have a test with step/finalize functions
throwing a Lua error from their body.

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v1 1/1] sql: introduce user-defined aggregate functions
  2022-01-18  0:10 ` Vladislav Shpilevoy via Tarantool-patches
@ 2022-01-23 14:17   ` Mergen Imeev via Tarantool-patches
  2022-01-24 22:08     ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 4+ messages in thread
From: Mergen Imeev via Tarantool-patches @ 2022-01-23 14:17 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

Hi! Thank you for the review! Diff, new patch and a patch that adds a new
utility function below.

On Tue, Jan 18, 2022 at 01:10:04AM +0100, Vladislav Shpilevoy wrote:
> Hi! Thanks for the patch!
> 
> What happened to the idea we had about SQL iterators? Then people would
> be able to iterate with yields and to make any kind of aggregation without
> having to create functions for that.
> 
I think that this idea is actually very similar to the definition of CURSOR from
the standard. Here's what the standard says about CURSORs:
"A cursor is a mechanism by which the rows of a table may be acted on (e.g.,
returned to a host programming language) one at a time."

I will try to suggest this idea.

However, current issue have quite strict deadline, which is actually quite near.
I believe that CURSOR injection is not a problem that can be solved in a few
weeks.

Also, about my current implementation. I don't like it, because there are too
many changes made to BOX that are not related to BOX itself. I think there is a
much simpler implementation. I think you noticed that the current implementation
says that non-constant functions and C functions should define two
implementations named "<name>" and "<name>_finalize". Do you think this rule can
be made more general? I suggest creating two tuples in _func instead of one when
we create an aggregate function using box.schema.func.create(). In this case,
there will be almost no changes in alter.cc and BOX as a whole, and the logic
will become much simpler. Also, I think we won't be adding new opcodes to the
VDBE since we can just reuse OP_FunctionByName. That problem with this approach
is that "<name>_finalize" will be occupied. Also, this could potentially lead to
a different combination of STEP and FINALIZE functions, for example, STEP could
be constant but FINALIZE could be non-persistent. However, to create such
combinations, the user must insert tuples directly into _func. I am not sure,
if this a feature or a bug. What do you think?

> See 8 comments below.
> 
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index 65c1cb952..f1dfcd807 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -3276,6 +3276,85 @@ func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid)
> >  	return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid);
> >  }
> >  
> > +static int
> > +func_def_check_body(struct tuple *tuple)
> > +{
> > +	assert(tuple_field_count(tuple) > BOX_FUNC_FIELD_BODY);
> > +	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
> > +	assert(field != NULL);
> > +	enum mp_type type = mp_typeof(*field);
> > +	if (type == MP_STR)
> > +		return 0;
> > +	if (type != MP_MAP) {
> > +		diag_set(ClientError, ER_FIELD_TYPE, "map or array",
> 
> 1. 'Map or string'.
> 
Fixed.

> > +			 mp_type_strs[type]);
> > +		return -1;
> > +	}
> > +	const char *agg = tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE);
> > +	if (agg == NULL)
> > +		return -1;
> > +	if (STR2ENUM(func_aggregate, agg) != FUNC_AGGREGATE_GROUP) {
> > +		const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
> > +		diag_set(ClientError, ER_CREATE_FUNCTION, name,
> > +			 "only aggregate functions can have map as body");
> > +		return -1;
> > +	}
> > +
> > +	bool has_step = false;
> > +	bool has_finalize = false;
> > +	if (mp_decode_map(&field) != 2)
> > +		goto error;
> > +	for (int i = 0; i < 2; ++i) {
> > +		const char *value;
> > +		uint32_t size;
> > +		if (mp_typeof(*field) != MP_STR)
> > +			goto error;
> > +		value = mp_decode_str(&field, &size);
> > +		if (size == strlen("step")) {
> > +			if (strncmp(value, "step", size) != 0)
> > +				goto error;
> 
> 2. I would rather propose to introduce a function to compare
> 0-terminated string with non-terminated one. It is needed quite
> often apparently. For instance,
> 
> 	strlcmp(const char *l, const char *r, size_t r_len);
> 
I added a new patch. I decided to insert it here instead of sending a new
version, since changes it intoduced are quite small. Here it is:

commit b30a92bdd47d49aed1b82a99dae0f9472e320ee8
Author: Mergen Imeev <imeevma@gmail.com>
Date:   Sun Jan 23 14:58:33 2022 +0300

    util: introduce strlcmp()
    
    Utility function strlcmp() was introduced to simplify comparisons
    between NULL-terminated strings and non-NULL-terminated strings.

diff --git a/src/lib/core/util.c b/src/lib/core/util.c
index f29886105..035e207b1 100644
--- a/src/lib/core/util.c
+++ b/src/lib/core/util.c
@@ -264,6 +264,15 @@ strlcpy(char *dst, const char *src, size_t size)
 }
 #endif
 
+int
+strlcmp(const char *l, const char *r, size_t r_len)
+{
+	size_t l_len = strlen(l);
+	if (l_len != r_len)
+		return l_len - r_len > 0 ? 1 : -1;
+	return strncmp(l, r, r_len);
+}
+
 int
 utf8_check_printable(const char *start, size_t length)
 {
diff --git a/src/trivia/util.h b/src/trivia/util.h
index 3d853f612..ee1b88609 100644
--- a/src/trivia/util.h
+++ b/src/trivia/util.h
@@ -490,6 +490,19 @@ size_t
 strlcpy(char *dst, const char *src, size_t size);
 #endif
 
+/**
+ * Compare two strings, the first of which is NULL-terminated and the second may
+ * not be NULL-terminated.
+ *
+ * @param l the first string, must be NULL-terminated.
+ * @param r the second string.
+ * @param r_len length of the second string.
+ *
+ * @return size of @a src string.
+ */
+int
+strlcmp(const char *l, const char *r, size_t r_len);
+
 /**
  * Check that @a str is valid utf-8 sequence and can be printed
  * unescaped.


> > +			has_step = true;
> > +		} else if (size == strlen("finalize")) {
> > +			if (strncmp(value, "finalize", size) != 0)
> > +				goto error;
> > +			has_finalize = true;
> > +		}
> > +		if (mp_typeof(*field) != MP_STR)
> > +			goto error;
> > +		mp_next(&field);
> > +	}
> > +	if (has_step && has_finalize)
> > +		return 0;
> > +error:
> > +	const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
> > +	diag_set(ClientError, ER_CREATE_FUNCTION, name,
> > +		 "body of aggregate function should be map that contains "
> > +		 "exactly two string fields: 'step' and 'finalize'");
> > +	return -1;
> > +}
> > +
> > +static const char *
> > +func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step)
> 
> 3. Please, lets split that function in 2 maybe. Its invocation with
> hardcoded 'true'/'false' in the end gives hard time understanding what
> these values mean.
> 
Fixed.

> > +{
> > +	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
> > +	assert(field != NULL);
> > +	if (mp_typeof(*field) == MP_STR) {
> > +		if (is_step)
> > +			return mp_decode_str(&field, body_len);
> > +		*body_len = 0;
> > +		return field;
> 
> 4. You return len as 0, but the returned value != NULL? Why?
> 
If I return NULL, that means an error has occurred. However, if I return
something other than NULL and the length is 0, that means a non-persistent
function has been introduced.

> > +	}
> > @@ -3568,6 +3697,16 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
> >  		struct func *func = func_new(def);
> >  		if (func == NULL)
> >  			return -1;
> > +		if (def->aggregate == FUNC_AGGREGATE_GROUP) {
> > +			struct func_def *fin_def =
> > +				func_fin_def_new(new_tuple, def);
> > +			if (fin_def == NULL)
> > +				return -1;
> > +			struct func *func_fin = func_new(fin_def);
> > +			if (func_fin == NULL)
> > +				return -1;
> > +			func_fin_cache_insert(func_fin);
> 
> 5. What happens if WAL write fails and on_create_func_rollback() is called?
> 
I missed it earlier. Do you think it's better to introduce a new field in struct
func/struct func_def or malloc()/free() a data for trigger->data?

> > diff --git a/src/box/schema.h b/src/box/schema.h
> > index d3bbdd590..2f020ee96 100644
> > --- a/src/box/schema.h
> > +++ b/src/box/schema.h
> > @@ -102,6 +102,15 @@ func_by_id(uint32_t fid);
> >  struct func *
> >  func_by_name(const char *name, uint32_t name_len);
> >  
> > +void
> > +func_fin_cache_insert(struct func *func);
> > +
> > +void
> > +func_fin_cache_delete(uint32_t fid);
> > +
> > +struct func *
> > +func_fin_by_id(uint32_t fid);
> 
> 6. Lets leave some comments about what is 'func fin'.
> 
Added a comment to func_fin_cache_insert().

> > diff --git a/src/box/sql/select.c b/src/box/sql/select.c
> > index 2e33a31d2..c77cd1c9f 100644
> > --- a/src/box/sql/select.c
> > +++ b/src/box/sql/select.c
> > @@ -5644,10 +5650,17 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
> >  			pParse->is_aborted = true;
> >  			return;
> >  		}
> > -		sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
> > -		sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
> > -		sql_expr_type_cache_change(pParse, regAgg, nArg);
> > -		sqlReleaseTempRange(pParse, regAgg, nArg);
> > +		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
> > +			sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
> > +			sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
> > +		} else {
> > +			sqlVdbeAddOp3(v, OP_UserAggStep, nArg, regAgg,
> > +				      pF->iMem);
> > +			const char *name = pF->func->def->name;
> > +			int len = pF->func->def->name_len;
> > +			char *str = sqlDbStrNDup(pParse->db, name, len);
> > +			sqlVdbeAppendP4(v, str, P4_DYNAMIC);
> 
> 7. Was it possible to unify how builtin and user-defined functions
> are called? In the same opcode without any special 'if's.
> 
I think something similar might be possible if we used struct func_sql_builtin
somewhere before these lines are called. However, some preparations needed to
be made, for example we have to force the user to re-prepare the statement in
case something changes in _func.

> > diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua
> > new file mode 100755
> > index 000000000..d5b845761
> > --- /dev/null
> > +++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua
> 8. It might also be good to have a test with step/finalize functions
> throwing a Lua error from their body.
Added.


Diff:

diff --git a/src/box/alter.cc b/src/box/alter.cc
index f1dfcd807..5381612af 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3286,7 +3286,7 @@ func_def_check_body(struct tuple *tuple)
 	if (type == MP_STR)
 		return 0;
 	if (type != MP_MAP) {
-		diag_set(ClientError, ER_FIELD_TYPE, "map or array",
+		diag_set(ClientError, ER_FIELD_TYPE, "map or string",
 			 mp_type_strs[type]);
 		return -1;
 	}
@@ -3310,15 +3310,12 @@ func_def_check_body(struct tuple *tuple)
 		if (mp_typeof(*field) != MP_STR)
 			goto error;
 		value = mp_decode_str(&field, &size);
-		if (size == strlen("step")) {
-			if (strncmp(value, "step", size) != 0)
-				goto error;
+		if (strlcmp("step", value, size) == 0)
 			has_step = true;
-		} else if (size == strlen("finalize")) {
-			if (strncmp(value, "finalize", size) != 0)
-				goto error;
+		else if (strlcmp("finalize", value, size) == 0)
 			has_finalize = true;
-		}
+		else
+			goto error;
 		if (mp_typeof(*field) != MP_STR)
 			goto error;
 		mp_next(&field);
@@ -3334,13 +3331,29 @@ error:
 }
 
 static const char *
-func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step)
+func_def_get_agg_step_body(struct tuple *tuple, uint32_t *body_len)
+{
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	if (mp_typeof(*field) == MP_STR)
+		return mp_decode_str(&field, body_len);
+	assert(mp_typeof(*field) == MP_MAP);
+	mp_decode_map(&field);
+	uint32_t size;
+	mp_decode_str(&field, &size);
+	if (size == strlen("step"))
+		return mp_decode_str(&field, body_len);
+	mp_next(&field);
+	mp_next(&field);
+	return mp_decode_str(&field, body_len);
+}
+
+static const char *
+func_def_get_agg_finalize_body(struct tuple *tuple, uint32_t *body_len)
 {
 	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
 	assert(field != NULL);
 	if (mp_typeof(*field) == MP_STR) {
-		if (is_step)
-			return mp_decode_str(&field, body_len);
 		*body_len = 0;
 		return field;
 	}
@@ -3348,7 +3361,7 @@ func_def_get_agg_body(struct tuple *tuple, uint32_t *body_len, bool is_step)
 	mp_decode_map(&field);
 	uint32_t size;
 	mp_decode_str(&field, &size);
-	if (size == (is_step ? strlen("step") : strlen("finalize")))
+	if (size == strlen("finalize"))
 		return mp_decode_str(&field, body_len);
 	mp_next(&field);
 	mp_next(&field);
@@ -3376,7 +3389,7 @@ func_def_new_from_tuple(struct tuple *tuple)
 	if (field_count > BOX_FUNC_FIELD_BODY) {
 		if (func_def_check_body(tuple) != 0)
 			return NULL;
-		body = func_def_get_agg_body(tuple, &body_len, true);
+		body = func_def_get_agg_step_body(tuple, &body_len);
 		if (body == NULL)
 			return NULL;
 		comment = tuple_field_str(tuple, BOX_FUNC_FIELD_COMMENT,
@@ -3628,7 +3641,7 @@ func_fin_def_new(struct tuple *tuple, const struct func_def *def)
 	const char *name = tt_sprintf("%s_finalize", def->name);
 	size_t name_len = strlen(name);
 	uint32_t body_len;
-	const char *body = func_def_get_agg_body(tuple, &body_len, false);
+	const char *body = func_def_get_agg_finalize_body(tuple, &body_len);
 	if (body == NULL)
 		return NULL;
 	if ((body_len == 0 && def->body != NULL) ||
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 873b9f9f4..e2253caa1 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1316,7 +1316,7 @@ case OP_UserAggStep: {
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
 	port_vdbemem_create(&args, (struct sql_value *)argv, pOp->p1 + 1);
-	if (func->vtab->call(func, &args, &ret) != 0) {
+	if (func_call(func, &args, &ret) != 0) {
 		region_truncate(region, region_svp);
 		goto abort_due_to_error;
 	}
@@ -1360,7 +1360,7 @@ case OP_UserAggFinal: {
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
 	port_vdbemem_create(&args, (struct sql_value *)argv, 1);
-	if (func->vtab->call(func, &args, &ret) != 0) {
+	if (func_call(func, &args, &ret) != 0) {
 		region_truncate(region, region_svp);
 		goto abort_due_to_error;
 	}
diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua
index d5b845761..b207257c6 100755
--- a/test/sql-tap/gh-2579-custom-aggregate.test.lua
+++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua
@@ -3,7 +3,7 @@ local build_path = os.getenv("BUILDDIR")
 package.cpath = build_path..'/test/sql-tap/?.so;'..build_path..'/test/sql-tap/?.dylib;'..package.cpath
 
 local test = require("sqltester")
-test:plan(13)
+test:plan(15)
 
 -- Make sure that non-persistent aggregate functions are working as expected.
 local step = function(x, y)
@@ -229,4 +229,50 @@ test:do_test(
         "be map that contains exactly two string fields: 'step' and 'finalize'"
     })
 
+--
+-- Make sure that user-defined aggregate functions that are throwing an error
+-- are working correctly.
+--
+local body_s10 = {
+    step = [[function(x) error("some error") return 0 end]],
+    finalize = [[function(x) return 0 end]]
+}
+
+box.schema.func.create('S10', {aggregate = 'group', returns = 'number',
+                              body = body_s10, exports = {'SQL'}})
+
+test:do_catchsql_test(
+    "gh-2579-14",
+    [[
+        SELECT s10();
+    ]], {
+        1, '[string "return function(x) error("some error") return..."]:1: '..
+           'some error'
+    })
+
+local body_s11 = {
+    step = [[function(x) return 0 end]],
+    finalize = [[function(x) error("another error") return 0 end]]
+}
+
+box.schema.func.create('S11', {aggregate = 'group', returns = 'number',
+                              body = body_s11, exports = {'SQL'}})
+
+test:do_catchsql_test(
+    "gh-2579-15",
+    [[
+        SELECT s11();
+    ]], {
+        1, '[string "return function(x) error("another error") ret..."]:1: '..
+           'another error'
+    })
+
+box.space._func.index[2]:delete("S1")
+box.space._func.index[2]:delete("S2")
+box.space._func.index[2]:delete("gh-2579-custom-aggregate.S3")
+box.space._func.index[2]:delete("S4")
+box.space._func.index[2]:delete("S10")
+box.space._func.index[2]:delete("S11")
+box.space.T:drop()
+
 test:finish_test()
diff --git a/src/box/schema.h b/src/box/schema.h
index 2f020ee96..186fa9db7 100644
--- a/src/box/schema.h
+++ b/src/box/schema.h
@@ -102,6 +102,12 @@ func_by_id(uint32_t fid);
 struct func *
 func_by_name(const char *name, uint32_t name_len);
 
+/**
+ * Insert a new function object for finalize of aggregate function in the
+ * function cache.
+ *
+ * @param func Function object to insert.
+ */
 void
 func_fin_cache_insert(struct func *func);



New patch:

commit 1caf17400eae504a192b6e49cba0d254a4a510ea
Author: Mergen Imeev <imeevma@gmail.com>
Date:   Tue Jan 11 08:48:58 2022 +0300

    sql: introduce user-defined aggregate functions
    
    This patch introduces user-defined SQL aggregate functions and an
    interface for creating them.
    
    Closes #2579
    
    @TarantoolBot document
    Title: User-defined aggregate functions
    
    User-defined aggregate functions are now available. There are three
    types of user-defined aggregate functions: persistent functions,
    non-persistent functions, and C-functions. All user-defined aggregate
    functions consist of two functions: a "step" function, which is executed
    for each record in the group, and a "finalize" function, which is
    executed after all records in the group have been processed.
    
    The "Step" function has one more argument than specified in "param_list"
    in the definition. This argument is the current state of the aggregate
    function and must be the first argument.
    
    The "Finalize" function has exactly one argument, which is the current
    state of the aggregate function.
    
    To use a C user-defined aggregate function and a non-persistent
    user-defined aggregate function, you must define a "step" function with
    a name equal to the function name, and a "finalize" function with a name
    equal to the function name with "_finalize" appended. For non-persistent
    functions, both functions must be global in Lua, and for a C function,
    the functions with the these names must be defined in the appropriate
    .so file.
    
    Example of non-persistent aggregate function:
    ```
    F0 = function(x, y)
        if x == nil then
            x = {sum = 0, count = 0}
        end
        x.sum = x.sum + y
        x.count = x.count + 1
        return x
    end
    
    F0_finalize = function(x)
        return x.sum / x.count
    end
    
    box.schema.func.create('F0', {aggregate = 'group', returns = 'number',
                                  param_list = {'integer'}, exports = {'SQL'}})
    box.execute([[SELECT f0(column_3) FROM (VALUES(1), (2), (3));]])
    ```
    
    To use a persistent user-defined aggregate function, you must define the
    "body" field in the function definition as a map with two fields, "step"
    and "finalize" according to the rules defined above.
    
    Example of persistent aggregate function:
    ```
    body = {
        step = [[function(x, y)
            if x == nil then
                x = {sum = 0, count = 0}
            end
            x.sum = x.sum + y
            x.count = x.count + 1
            return x
        end]],
        finalize = [[function(x)
            return x.sum / x.count
        end]]
    }
    
    box.schema.func.create('F1', {aggregate = 'group', returns = 'number',
                                  body = body, param_list = {'integer'},
                                  exports = {'SQL'}})
    box.execute([[SELECT f1(column_3) FROM (VALUES(1), (2), (3));]])
    ```

diff --git a/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md b/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md
new file mode 100644
index 000000000..68adad478
--- /dev/null
+++ b/changelogs/unreleased/gh-2579-introduce-custom-aggregate-functions.md
@@ -0,0 +1,4 @@
+## feature/sql
+
+* Custom SQL aggregate function and an interface to create them is now
+  available (gh-2579).
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 65c1cb952..5381612af 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3276,6 +3276,98 @@ func_def_get_ids_from_tuple(struct tuple *tuple, uint32_t *fid, uint32_t *uid)
 	return tuple_field_u32(tuple, BOX_FUNC_FIELD_UID, uid);
 }
 
+static int
+func_def_check_body(struct tuple *tuple)
+{
+	assert(tuple_field_count(tuple) > BOX_FUNC_FIELD_BODY);
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	enum mp_type type = mp_typeof(*field);
+	if (type == MP_STR)
+		return 0;
+	if (type != MP_MAP) {
+		diag_set(ClientError, ER_FIELD_TYPE, "map or string",
+			 mp_type_strs[type]);
+		return -1;
+	}
+	const char *agg = tuple_field_cstr(tuple, BOX_FUNC_FIELD_AGGREGATE);
+	if (agg == NULL)
+		return -1;
+	if (STR2ENUM(func_aggregate, agg) != FUNC_AGGREGATE_GROUP) {
+		const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
+		diag_set(ClientError, ER_CREATE_FUNCTION, name,
+			 "only aggregate functions can have map as body");
+		return -1;
+	}
+
+	bool has_step = false;
+	bool has_finalize = false;
+	if (mp_decode_map(&field) != 2)
+		goto error;
+	for (int i = 0; i < 2; ++i) {
+		const char *value;
+		uint32_t size;
+		if (mp_typeof(*field) != MP_STR)
+			goto error;
+		value = mp_decode_str(&field, &size);
+		if (strlcmp("step", value, size) == 0)
+			has_step = true;
+		else if (strlcmp("finalize", value, size) == 0)
+			has_finalize = true;
+		else
+			goto error;
+		if (mp_typeof(*field) != MP_STR)
+			goto error;
+		mp_next(&field);
+	}
+	if (has_step && has_finalize)
+		return 0;
+error:
+	const char *name = tuple_field_cstr(tuple, BOX_FUNC_FIELD_NAME);
+	diag_set(ClientError, ER_CREATE_FUNCTION, name,
+		 "body of aggregate function should be map that contains "
+		 "exactly two string fields: 'step' and 'finalize'");
+	return -1;
+}
+
+static const char *
+func_def_get_agg_step_body(struct tuple *tuple, uint32_t *body_len)
+{
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	if (mp_typeof(*field) == MP_STR)
+		return mp_decode_str(&field, body_len);
+	assert(mp_typeof(*field) == MP_MAP);
+	mp_decode_map(&field);
+	uint32_t size;
+	mp_decode_str(&field, &size);
+	if (size == strlen("step"))
+		return mp_decode_str(&field, body_len);
+	mp_next(&field);
+	mp_next(&field);
+	return mp_decode_str(&field, body_len);
+}
+
+static const char *
+func_def_get_agg_finalize_body(struct tuple *tuple, uint32_t *body_len)
+{
+	const char *field = tuple_field(tuple, BOX_FUNC_FIELD_BODY);
+	assert(field != NULL);
+	if (mp_typeof(*field) == MP_STR) {
+		*body_len = 0;
+		return field;
+	}
+	assert(mp_typeof(*field) == MP_MAP);
+	mp_decode_map(&field);
+	uint32_t size;
+	mp_decode_str(&field, &size);
+	if (size == strlen("finalize"))
+		return mp_decode_str(&field, body_len);
+	mp_next(&field);
+	mp_next(&field);
+	return mp_decode_str(&field, body_len);
+}
+
 /** Create a function definition from tuple. */
 static struct func_def *
 func_def_new_from_tuple(struct tuple *tuple)
@@ -3295,7 +3387,9 @@ func_def_new_from_tuple(struct tuple *tuple)
 	if (identifier_check(name, name_len) != 0)
 		return NULL;
 	if (field_count > BOX_FUNC_FIELD_BODY) {
-		body = tuple_field_str(tuple, BOX_FUNC_FIELD_BODY, &body_len);
+		if (func_def_check_body(tuple) != 0)
+			return NULL;
+		body = func_def_get_agg_step_body(tuple, &body_len);
 		if (body == NULL)
 			return NULL;
 		comment = tuple_field_str(tuple, BOX_FUNC_FIELD_COMMENT,
@@ -3498,6 +3592,11 @@ func_def_new_from_tuple(struct tuple *tuple)
 		def->exports.lua = true;
 		def->param_count = 0;
 	}
+	if (def->aggregate == FUNC_AGGREGATE_GROUP && def->exports.lua != 0) {
+		diag_set(ClientError, ER_CREATE_FUNCTION, def->name,
+			 "aggregate function can only be accessed in SQL");
+		return NULL;
+	}
 	if (func_def_check(def) != 0)
 		return NULL;
 	def_guard.is_active = false;
@@ -3536,6 +3635,49 @@ on_drop_func_rollback(struct trigger *trigger, void * /* event */)
 	return 0;
 }
 
+struct func_def *
+func_fin_def_new(struct tuple *tuple, const struct func_def *def)
+{
+	const char *name = tt_sprintf("%s_finalize", def->name);
+	size_t name_len = strlen(name);
+	uint32_t body_len;
+	const char *body = func_def_get_agg_finalize_body(tuple, &body_len);
+	if (body == NULL)
+		return NULL;
+	if ((body_len == 0 && def->body != NULL) ||
+	    (def->body == NULL && body_len != 0)) {
+		diag_set(ClientError, ER_CREATE_FUNCTION, def->name,
+			 "step or finalize of aggregate function is undefined");
+		return NULL;
+	}
+	uint32_t len = sizeof(struct func_def) + name_len + 1;
+	if (body_len > 0)
+		len += body_len + 1;
+	struct func_def *fin_def = (struct func_def *)malloc(len);
+	fin_def->fid = def->fid;
+	fin_def->uid = def->uid;
+	if (body_len > 0) {
+		fin_def->body = fin_def->name + name_len + 1;
+		memcpy(fin_def->body, body, body_len);
+		fin_def->body[body_len] = '\0';
+	} else {
+		fin_def->body = NULL;
+	}
+	fin_def->comment = NULL;
+	fin_def->setuid = def->setuid;
+	fin_def->is_deterministic = def->is_deterministic;
+	fin_def->is_sandboxed = def->is_sandboxed;
+	fin_def->param_count = 0;
+	fin_def->returns = def->returns;
+	fin_def->aggregate = FUNC_AGGREGATE_GROUP;
+	fin_def->language = def->language;
+	fin_def->name_len = name_len;
+	fin_def->exports = def->exports;
+	fin_def->opts = def->opts;
+	memcpy(fin_def->name, name, name_len + 1);
+	return fin_def;
+}
+
 /**
  * A trigger invoked on replace in a space containing
  * functions on which there were defined any grants.
@@ -3568,6 +3710,16 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 		struct func *func = func_new(def);
 		if (func == NULL)
 			return -1;
+		if (def->aggregate == FUNC_AGGREGATE_GROUP) {
+			struct func_def *fin_def =
+				func_fin_def_new(new_tuple, def);
+			if (fin_def == NULL)
+				return -1;
+			struct func *func_fin = func_new(fin_def);
+			if (func_fin == NULL)
+				return -1;
+			func_fin_cache_insert(func_fin);
+		}
 		def_guard.is_active = false;
 		func_cache_insert(func);
 		on_rollback->data = func;
@@ -3610,6 +3762,8 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 			txn_alter_trigger_new(on_drop_func_rollback, old_func);
 		if (on_commit == NULL || on_rollback == NULL)
 			return -1;
+		if (old_func->def->aggregate == FUNC_AGGREGATE_GROUP)
+			func_fin_cache_delete(old_func->def->fid);
 		func_cache_delete(old_func->def->fid);
 		txn_stmt_on_commit(stmt, on_commit);
 		txn_stmt_on_rollback(stmt, on_rollback);
@@ -3622,9 +3776,13 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 		 * definition to support upgrade script.
 		 */
 		struct func_def *old_def = NULL, *new_def = NULL;
-		auto guard = make_scoped_guard([&old_def, &new_def] {
+		struct func_def *new_fin_def = NULL;
+		struct func_def *old_fin_def = NULL;
+		auto guard = make_scoped_guard([=] {
 			free(old_def);
 			free(new_def);
+			free(new_fin_def);
+			free(old_fin_def);
 		});
 		old_def = func_def_new_from_tuple(old_tuple);
 		new_def = func_def_new_from_tuple(new_tuple);
@@ -3635,6 +3793,18 @@ on_replace_dd_func(struct trigger * /* trigger */, void *event)
 				  "alter");
 			return -1;
 		}
+		if (new_def->aggregate != FUNC_AGGREGATE_GROUP)
+			return 0;
+		new_fin_def = func_fin_def_new(new_tuple, new_def);
+		old_fin_def = func_fin_def_new(old_tuple, old_def);
+		if (new_fin_def == NULL || old_fin_def == NULL)
+			return -1;
+		if (func_def_cmp(new_fin_def, old_fin_def) != 0) {
+			diag_set(ClientError, ER_UNSUPPORTED, "function",
+				 "alter");
+			return -1;
+		}
+		return 0;
 	}
 	return 0;
 }
diff --git a/src/box/bootstrap.snap b/src/box/bootstrap.snap
index 018670d2a..610e513fd 100644
Binary files a/src/box/bootstrap.snap and b/src/box/bootstrap.snap differ
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 99b65b0f5..8c328852d 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -2597,7 +2597,8 @@ box.schema.func.create = function(name, opts)
     opts = opts or {}
     check_param_table(opts, { setuid = 'boolean',
                               if_not_exists = 'boolean',
-                              language = 'string', body = 'string',
+                              language = 'string', body = 'any',
+                              aggregate = 'string',
                               is_deterministic = 'boolean',
                               is_sandboxed = 'boolean',
                               is_multikey = 'boolean',
diff --git a/src/box/lua/upgrade.lua b/src/box/lua/upgrade.lua
index f9e6543c2..0e2c4a091 100644
--- a/src/box/lua/upgrade.lua
+++ b/src/box/lua/upgrade.lua
@@ -1212,6 +1212,21 @@ local function upgrade_to_2_9_1()
     remove_sql_builtin_functions_from_func()
 end
 
+--------------------------------------------------------------------------------
+-- Tarantool 2.10.1
+--------------------------------------------------------------------------------
+local function change_func_body_field_type()
+    local _func = box.space._func
+    local format = _func:format()
+    -- Change type of 'body' field from 'string' to 'any'.
+    format[6].type = 'any'
+    _func:format(format)
+end
+
+local function upgrade_to_2_10_1()
+    change_func_body_field_type()
+end
+
 --------------------------------------------------------------------------------
 
 local handlers = {
@@ -1229,6 +1244,7 @@ local handlers = {
     {version = mkversion(2, 3, 1), func = upgrade_to_2_3_1, auto = true},
     {version = mkversion(2, 7, 1), func = upgrade_to_2_7_1, auto = true},
     {version = mkversion(2, 9, 1), func = upgrade_to_2_9_1, auto = true},
+    {version = mkversion(2, 10, 1), func = upgrade_to_2_10_1, auto = true},
 }
 
 -- Schema version of the snapshot.
diff --git a/src/box/schema.cc b/src/box/schema.cc
index c248d4ee4..eb1b4a7dd 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -60,6 +60,7 @@ static struct mh_i32ptr_t *spaces;
 static struct mh_strnptr_t *spaces_by_name;
 static struct mh_i32ptr_t *funcs;
 static struct mh_strnptr_t *funcs_by_name;
+static struct mh_i32ptr_t *funcs_fin;
 static struct mh_i32ptr_t *sequences;
 /** Public change counter. On its update clients need to fetch
  *  new space data from the instance. */
@@ -379,6 +380,7 @@ schema_init(void)
 	spaces_by_name = mh_strnptr_new();
 	funcs = mh_i32ptr_new();
 	funcs_by_name = mh_strnptr_new();
+	funcs_fin = mh_i32ptr_new();
 	sequences = mh_i32ptr_new();
 	/*
 	 * Create surrogate space objects for the mandatory system
@@ -540,6 +542,14 @@ schema_free(void)
 		func_delete(func);
 	}
 	mh_i32ptr_delete(funcs);
+	while (mh_size(funcs_fin) > 0) {
+		mh_int_t i = mh_first(funcs_fin);
+		struct func *func =
+			(struct func *)mh_i32ptr_node(funcs_fin, i)->val;
+		func_cache_delete(func->def->fid);
+		func_delete(func);
+	}
+	mh_i32ptr_delete(funcs_fin);
 	while (mh_size(sequences) > 0) {
 		mh_int_t i = mh_first(sequences);
 
@@ -597,6 +607,33 @@ func_by_name(const char *name, uint32_t name_len)
 	return (struct func *) mh_strnptr_node(funcs_by_name, func)->val;
 }
 
+void
+func_fin_cache_insert(struct func *func)
+{
+	uint32_t fid = func->def->fid;
+	assert(func_fin_by_id(fid) == NULL);
+	const struct mh_i32ptr_node_t node = {fid, func};
+	mh_i32ptr_put(funcs_fin, &node, NULL, NULL);
+}
+
+void
+func_fin_cache_delete(uint32_t fid)
+{
+	mh_int_t k = mh_i32ptr_find(funcs_fin, fid, NULL);
+	if (k == mh_end(funcs_fin))
+		return;
+	mh_i32ptr_del(funcs_fin, k, NULL);
+}
+
+struct func *
+func_fin_by_id(uint32_t fid)
+{
+	mh_int_t func = mh_i32ptr_find(funcs_fin, fid, NULL);
+	if (func == mh_end(funcs_fin))
+		return NULL;
+	return (struct func *)mh_i32ptr_node(funcs_fin, func)->val;
+}
+
 int
 schema_find_grants(const char *type, uint32_t id, bool *out)
 {
diff --git a/src/box/schema.h b/src/box/schema.h
index d3bbdd590..186fa9db7 100644
--- a/src/box/schema.h
+++ b/src/box/schema.h
@@ -102,6 +102,21 @@ func_by_id(uint32_t fid);
 struct func *
 func_by_name(const char *name, uint32_t name_len);
 
+/**
+ * Insert a new function object for finalize of aggregate function in the
+ * function cache.
+ *
+ * @param func Function object to insert.
+ */
+void
+func_fin_cache_insert(struct func *func);
+
+void
+func_fin_cache_delete(uint32_t fid);
+
+struct func *
+func_fin_by_id(uint32_t fid);
+
 /** Call a visitor function on every space in the space cache. */
 int
 space_foreach(int (*func)(struct space *sp, void *udata), void *udata);
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index eb169aeb8..46ffe377c 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -5469,7 +5469,18 @@ analyzeAggregate(Walker * pWalker, Expr * pExpr)
 						       (pExpr, EP_xIsSelect));
 						pItem = &pAggInfo->aFunc[i];
 						pItem->pExpr = pExpr;
-						pItem->iMem = ++pParse->nMem;
+						int n = pExpr->x.pList == NULL ?
+							0 : pExpr->x.pList->nExpr;
+						/*
+						 * Allocate 1 MEM for
+						 * accumulator and next n MEMs
+						 * for arguments. This makes it
+						 * easier to pass these n + 1
+						 * MEMs to the user-defined
+						 * aggregate function.
+						 */
+						pItem->iMem = pParse->nMem + 1;
+						pParse->nMem += n + 1;
 						assert(!ExprHasProperty
 						       (pExpr, EP_IntValue));
 						pItem->func =
@@ -5479,12 +5490,6 @@ analyzeAggregate(Walker * pWalker, Expr * pExpr)
 								true;
 							return WRC_Abort;
 						}
-						assert(pItem->func->def->
-						       language ==
-						       FUNC_LANGUAGE_SQL_BUILTIN &&
-						       pItem->func->def->
-						       aggregate ==
-						       FUNC_AGGREGATE_GROUP);
 						if (pExpr->flags & EP_Distinct) {
 							pItem->iDistinct =
 								pParse->nTab++;
diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index b69bf7fd6..493b69a4c 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -2072,9 +2072,12 @@ uint32_t
 sql_func_flags(const char *name)
 {
 	struct sql_func_dictionary *dict = built_in_func_get(name);
-	if (dict == NULL)
+	if (dict != NULL)
+		return dict->flags;
+	struct func *func = func_by_name(name, strlen(name));
+	if (func == NULL || func->def->aggregate != FUNC_AGGREGATE_GROUP)
 		return 0;
-	return dict->flags;
+	return SQL_FUNC_AGG;
 }
 
 static struct func_vtab func_sql_builtin_vtab;
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 2e33a31d2..c77cd1c9f 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -4648,8 +4648,6 @@ is_simple_count(struct Select *select, struct AggInfo *agg_info)
 		return NULL;
 	if (NEVER(agg_info->nFunc == 0))
 		return NULL;
-	assert(agg_info->aFunc->func->def->language ==
-	       FUNC_LANGUAGE_SQL_BUILTIN);
 	if (memcmp(agg_info->aFunc->func->def->name, "COUNT", 5) != 0 ||
 	    (agg_info->aFunc->pExpr->x.pList != NULL &&
 	     agg_info->aFunc->pExpr->x.pList->nExpr > 0))
@@ -5575,9 +5573,17 @@ finalizeAggFunctions(Parse * pParse, AggInfo * pAggInfo)
 	for (i = 0, pF = pAggInfo->aFunc; i < pAggInfo->nFunc; i++, pF++) {
 		ExprList *pList = pF->pExpr->x.pList;
 		assert(!ExprHasProperty(pF->pExpr, EP_xIsSelect));
-		sqlVdbeAddOp2(v, OP_AggFinal, pF->iMem,
-				  pList ? pList->nExpr : 0);
-		sqlVdbeAppendP4(v, pF->func, P4_FUNC);
+		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
+			sqlVdbeAddOp2(v, OP_AggFinal, pF->iMem,
+				      pList != NULL ? pList->nExpr : 0);
+			sqlVdbeAppendP4(v, pF->func, P4_FUNC);
+		} else {
+			sqlVdbeAddOp1(v, OP_UserAggFinal, pF->iMem);
+			const char *name = pF->func->def->name;
+			int len = pF->func->def->name_len;
+			char *str = sqlDbStrNDup(pParse->db, name, len);
+			sqlVdbeAppendP4(v, str, P4_DYNAMIC);
+		}
 	}
 }
 
@@ -5604,7 +5610,7 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
 		assert(!ExprHasProperty(pF->pExpr, EP_xIsSelect));
 		if (pList) {
 			nArg = pList->nExpr;
-			regAgg = sqlGetTempRange(pParse, nArg);
+			regAgg = pF->iMem + 1;
 			sqlExprCodeExprList(pParse, pList, regAgg, 0,
 						SQL_ECEL_DUP);
 		} else {
@@ -5644,10 +5650,17 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
 			pParse->is_aborted = true;
 			return;
 		}
-		sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
-		sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
-		sql_expr_type_cache_change(pParse, regAgg, nArg);
-		sqlReleaseTempRange(pParse, regAgg, nArg);
+		if (pF->func->def->language == FUNC_LANGUAGE_SQL_BUILTIN) {
+			sqlVdbeAddOp3(v, OP_AggStep, nArg, regAgg, pF->iMem);
+			sqlVdbeAppendP4(v, ctx, P4_FUNCCTX);
+		} else {
+			sqlVdbeAddOp3(v, OP_UserAggStep, nArg, regAgg,
+				      pF->iMem);
+			const char *name = pF->func->def->name;
+			int len = pF->func->def->name_len;
+			char *str = sqlDbStrNDup(pParse->db, name, len);
+			sqlVdbeAppendP4(v, str, P4_DYNAMIC);
+		}
 		if (addrNext) {
 			sqlVdbeResolveLabel(v, addrNext);
 			sqlExprCacheClear(pParse);
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 24cb28260..e2253caa1 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1289,6 +1289,93 @@ case OP_FunctionByName: {
 	break;
 }
 
+/*
+ * Opcode: UserAggStep P1 P2 P3 P4 *
+ * Synopsis: accum=r[P3] step(r[P2@P1])
+ *
+ * Execute the step function for a user-defined aggregate function. The step
+ * function has P1 arguments. P4 is a pointer to a function object that defines
+ * the aggregate function. Register P3 is the accumulator.
+ *
+ * The P1 arguments are taken from register P2 and its successors. The P2
+ * register must be immediately after the P3 register.
+ */
+case OP_UserAggStep: {
+	assert(pOp->p4type == P4_DYNAMIC);
+	assert(pOp->p1 == 0 || pOp->p3 == pOp->p2 - 1);
+	struct func *func = func_by_name(pOp->p4.z, strlen(pOp->p4.z));
+	if (func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+
+	struct Mem *argv = &aMem[pOp->p3];
+	struct port args;
+	struct port ret;
+
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	port_vdbemem_create(&args, (struct sql_value *)argv, pOp->p1 + 1);
+	if (func_call(func, &args, &ret) != 0) {
+		region_truncate(region, region_svp);
+		goto abort_due_to_error;
+	}
+
+	uint32_t size;
+	struct Mem *mem = (struct Mem *)port_get_vdbemem(&ret, &size);
+	assert(size == 1);
+	bool is_error = mem == NULL || mem_copy(&aMem[pOp->p3], mem) != 0;
+	port_destroy(&ret);
+	region_truncate(region, region_svp);
+	if (is_error)
+		goto abort_due_to_error;
+	break;
+}
+
+/* Opcode: UserAggFinal P1 * * P4 *
+ * Synopsis: r[P1] = finalize(r[P1])
+ *
+ * Execute the finalize function for a user-defined aggregate function. P4 is a
+ * pointer to a function object that defines the aggregate function. Register P1
+ * is the accumulator. The result will be written to register P1.
+ */
+case OP_UserAggFinal: {
+	assert(pOp->p1 > 0 && pOp->p1 <= (p->nMem + 1 - p->nCursor));
+	assert(pOp->p4type == P4_DYNAMIC);
+	struct func *step_func = func_by_name(pOp->p4.z, strlen(pOp->p4.z));
+	if (step_func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+	struct func *func = func_fin_by_id(step_func->def->fid);
+	if (func == NULL) {
+		diag_set(ClientError, ER_NO_SUCH_FUNCTION, pOp->p4.z);
+		goto abort_due_to_error;
+	}
+
+	struct Mem *argv = &aMem[pOp->p1];
+	struct port args;
+	struct port ret;
+
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	port_vdbemem_create(&args, (struct sql_value *)argv, 1);
+	if (func_call(func, &args, &ret) != 0) {
+		region_truncate(region, region_svp);
+		goto abort_due_to_error;
+	}
+
+	uint32_t size;
+	struct Mem *mem = (struct Mem *)port_get_vdbemem(&ret, &size);
+	assert(size == 1);
+	bool is_error = mem == NULL || mem_copy(&aMem[pOp->p1], mem) != 0;
+	port_destroy(&ret);
+	region_truncate(region, region_svp);
+	if (is_error)
+		goto abort_due_to_error;
+	break;
+}
+
 /* Opcode: BitAnd P1 P2 P3 * *
  * Synopsis: r[P3]=r[P1]&r[P2]
  *
diff --git a/test/box-py/bootstrap.result b/test/box-py/bootstrap.result
index cea440c64..78d444bb0 100644
--- a/test/box-py/bootstrap.result
+++ b/test/box-py/bootstrap.result
@@ -4,7 +4,7 @@ box.internal.bootstrap()
 box.space._schema:select{}
 ---
 - - ['max_id', 511]
-  - ['version', 2, 9, 1]
+  - ['version', 2, 10, 1]
 ...
 box.space._cluster:select{}
 ---
@@ -54,7 +54,7 @@ box.space._space:select{}
   - [296, 1, '_func', 'memtx', 0, {}, [{'name': 'id', 'type': 'unsigned'}, {'name': 'owner',
         'type': 'unsigned'}, {'name': 'name', 'type': 'string'}, {'name': 'setuid',
         'type': 'unsigned'}, {'name': 'language', 'type': 'string'}, {'name': 'body',
-        'type': 'string'}, {'name': 'routine_type', 'type': 'string'}, {'name': 'param_list',
+        'type': 'any'}, {'name': 'routine_type', 'type': 'string'}, {'name': 'param_list',
         'type': 'array'}, {'name': 'returns', 'type': 'string'}, {'name': 'aggregate',
         'type': 'string'}, {'name': 'sql_data_access', 'type': 'string'}, {'name': 'is_deterministic',
         'type': 'boolean'}, {'name': 'is_sandboxed', 'type': 'boolean'}, {'name': 'is_null_call',
diff --git a/test/sql-tap/CMakeLists.txt b/test/sql-tap/CMakeLists.txt
index c4ec1214a..136a517d4 100644
--- a/test/sql-tap/CMakeLists.txt
+++ b/test/sql-tap/CMakeLists.txt
@@ -3,6 +3,8 @@ build_module(gh-5938-wrong-string-length gh-5938-wrong-string-length.c)
 target_link_libraries(gh-5938-wrong-string-length msgpuck)
 build_module(gh-6024-funcs-return-bin gh-6024-funcs-return-bin.c)
 target_link_libraries(gh-6024-funcs-return-bin msgpuck)
+build_module(gh-2579-custom-aggregate gh-2579-custom-aggregate.c)
+target_link_libraries(gh-2579-custom-aggregate msgpuck)
 build_module(sql_uuid sql_uuid.c)
 target_link_libraries(sql_uuid msgpuck core)
 build_module(decimal decimal.c)
diff --git a/test/sql-tap/gh-2579-custom-aggregate.c b/test/sql-tap/gh-2579-custom-aggregate.c
new file mode 100644
index 000000000..5552effc5
--- /dev/null
+++ b/test/sql-tap/gh-2579-custom-aggregate.c
@@ -0,0 +1,48 @@
+#include "msgpuck.h"
+#include "module.h"
+#include "mp_decimal.h"
+#include "lua/tnt_msgpuck.h"
+#include "mp_uuid.h"
+
+enum {
+	BUF_SIZE = 512,
+};
+
+int
+S3(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+	(void)args_end;
+	uint32_t arg_count = mp_decode_array(&args);
+	if (arg_count != 2) {
+		return box_error_set(__FILE__, __LINE__, ER_PROC_C,
+				     "invalid argument count");
+	}
+	int sum = 0;
+	if (mp_typeof(*args) != MP_UINT)
+		mp_decode_nil(&args);
+	else
+		sum = mp_decode_uint(&args);
+	sum += mp_decode_uint(&args);
+	char res[BUF_SIZE];
+	char *end = mp_encode_uint(res, sum);
+	box_return_mp(ctx, res, end);
+	return 0;
+}
+
+int
+S3_finalize(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+	(void)args_end;
+	uint32_t arg_count = mp_decode_array(&args);
+	if (arg_count != 1) {
+		return box_error_set(__FILE__, __LINE__, ER_PROC_C,
+				     "invalid argument count");
+	}
+	int sum = 0;
+	if (mp_typeof(*args) == MP_UINT)
+		sum = mp_decode_uint(&args);
+	char res[BUF_SIZE];
+	char *end = mp_encode_double(res, sum / 10.0);
+	box_return_mp(ctx, res, end);
+	return 0;
+}
diff --git a/test/sql-tap/gh-2579-custom-aggregate.test.lua b/test/sql-tap/gh-2579-custom-aggregate.test.lua
new file mode 100755
index 000000000..b207257c6
--- /dev/null
+++ b/test/sql-tap/gh-2579-custom-aggregate.test.lua
@@ -0,0 +1,278 @@
+#!/usr/bin/env tarantool
+local build_path = os.getenv("BUILDDIR")
+package.cpath = build_path..'/test/sql-tap/?.so;'..build_path..'/test/sql-tap/?.dylib;'..package.cpath
+
+local test = require("sqltester")
+test:plan(15)
+
+-- Make sure that non-persistent aggregate functions are working as expected.
+local step = function(x, y)
+    if x == nil then
+        x = {sum = 0, count = 0}
+    end
+    x.sum = x.sum + y
+    x.count = x.count + 1
+    return x
+end
+
+local finalize = function(x)
+    return x.sum / x.count
+end
+
+rawset(_G, 'S1', step)
+rawset(_G, 'S1_finalize', finalize)
+box.schema.func.create('S1', {aggregate = 'group', returns = 'number',
+                              param_list = {'integer'}, exports = {'SQL'}})
+
+test:execsql([[
+    CREATE TABLE t(i INTEGER PRIMARY KEY AUTOINCREMENT, a INT);
+    INSERT INTO t(a) VALUES(1), (1), (2), (2), (3);
+]])
+
+test:do_execsql_test(
+    "gh-2579-1",
+    [[
+        SELECT avg(a), s1(a) FROM t;
+    ]], {
+        1, 1.8
+    })
+
+test:do_execsql_test(
+    "gh-2579-2",
+    [[
+        SELECT avg(distinct a), s1(distinct a) FROM t;
+    ]], {
+        2, 2
+    })
+
+-- Make sure that persistent aggregate functions are working as expected.
+local body_s2 = {
+    step = [[function(x, y)
+        if x == nil then
+            x = {sum = 0, count = 0}
+        end
+        x.sum = x.sum + y
+        x.count = x.count + 1
+        return x
+    end]],
+    finalize = [[function(x)
+        return x.sum / x.count
+    end]]
+}
+
+box.schema.func.create('S2', {aggregate = 'group', returns = 'number',
+                              body = body_s2, param_list = {'integer'},
+                              exports = {'SQL'}})
+
+test:do_execsql_test(
+    "gh-2579-3",
+    [[
+        SELECT avg(a), s2(a) FROM t;
+    ]], {
+        1, 1.8
+    })
+
+test:do_execsql_test(
+    "gh-2579-4",
+    [[
+        SELECT avg(distinct a), s2(distinct a) FROM t;
+    ]], {
+        2, 2
+    })
+
+-- Make sure that C aggregate functions are working as expected.
+box.schema.func.create("gh-2579-custom-aggregate.S3", {
+    language = "C", aggregate = 'group', param_list = {"integer"},
+    returns = "number", exports = {"SQL"},
+})
+
+test:do_execsql_test(
+    "gh-2579-5",
+    [[
+        SELECT "gh-2579-custom-aggregate.S3"(a) FROM t;
+    ]], {
+        0.9
+    })
+
+test:do_execsql_test(
+    "gh-2579-6",
+    [[
+        SELECT "gh-2579-custom-aggregate.S3"(distinct a) FROM t;
+    ]], {
+        0.6
+    })
+
+--
+-- Make sure that user-defined aggregate functions can have more than one
+-- argument.
+--
+local body_s4 = {
+    step = [[function(x, y, z)
+        if x == nil then
+            x = {sum = 0, count = 0}
+        end
+        x.sum = x.sum + y + z
+        x.count = x.count + 1
+        return x
+    end]],
+    finalize = [[function(x)
+        return x.sum / x.count
+    end]]
+}
+box.schema.func.create('S4', {aggregate = 'group', returns = 'number',
+                              param_list = {'integer', 'integer'},
+                              body = body_s4, exports = {'SQL'}})
+
+test:do_execsql_test(
+    "gh-2579-7",
+    [[
+        SELECT s4(a, i) FROM t;
+    ]], {
+        4.8
+    })
+
+--
+-- Make sure that user-defined aggregate functions with more than one argument
+-- cannot work with DISTINCT.
+--
+test:do_catchsql_test(
+    "gh-2579-8",
+    [[
+        SELECT s4(distinct a, i) FROM t;
+    ]], {
+        1, "DISTINCT aggregates must have exactly one argument"
+    })
+
+-- Make sure user-defined aggregate functions are not available in Lua.
+test:do_test(
+    "gh-2579-9",
+    function()
+        local def = {aggregate = 'group', returns = 'number',
+                     param_list = {'integer'}, exports = {'LUA', 'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S5', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S5': aggregate function can only be "..
+        "accessed in SQL"
+    })
+
+--
+-- Make sure user-defined aggregate functions should have both STEP and FINALIZE
+-- persistent or non-persistent.
+--
+test:do_test(
+    "gh-2579-10",
+    function()
+        local body = "function(x) return x / 1000 end"
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S6', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S6': step or finalize of aggregate "..
+        "function is undefined"
+    })
+
+--
+-- Make sure body of user-defined aggregate function should have exactly two
+-- fields: "step" and "finalize".
+--
+test:do_test(
+    "gh-2579-11",
+    function()
+        local body = {step = "function(x) return x / 1000 end"}
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S7', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S7': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+test:do_test(
+    "gh-2579-12",
+    function()
+        local body = {finalize = "function(x) return x / 1000 end"}
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S8', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S8': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+test:do_test(
+    "gh-2579-13",
+    function()
+        local body = {
+            step = [[function(x, y, z)
+                if x == nil then
+                    x = {sum = 0, count = 0}
+                end
+                x.sum = x.sum + y + z
+                x.count = x.count + 1
+                return x
+            end]],
+            finalize = [[function(x)
+                return x.sum / x.count
+            end]],
+            something = 0
+        }
+        local def = {aggregate = 'group', returns = 'number', body = body,
+                     param_list = {'integer'}, exports = {'SQL'}}
+        local res = {pcall(box.schema.func.create, 'S9', def)}
+        return {tostring(res[2])}
+    end, {
+        "Failed to create function 'S9': body of aggregate function should "..
+        "be map that contains exactly two string fields: 'step' and 'finalize'"
+    })
+
+--
+-- Make sure that user-defined aggregate functions that are throwing an error
+-- are working correctly.
+--
+local body_s10 = {
+    step = [[function(x) error("some error") return 0 end]],
+    finalize = [[function(x) return 0 end]]
+}
+
+box.schema.func.create('S10', {aggregate = 'group', returns = 'number',
+                              body = body_s10, exports = {'SQL'}})
+
+test:do_catchsql_test(
+    "gh-2579-14",
+    [[
+        SELECT s10();
+    ]], {
+        1, '[string "return function(x) error("some error") return..."]:1: '..
+           'some error'
+    })
+
+local body_s11 = {
+    step = [[function(x) return 0 end]],
+    finalize = [[function(x) error("another error") return 0 end]]
+}
+
+box.schema.func.create('S11', {aggregate = 'group', returns = 'number',
+                              body = body_s11, exports = {'SQL'}})
+
+test:do_catchsql_test(
+    "gh-2579-15",
+    [[
+        SELECT s11();
+    ]], {
+        1, '[string "return function(x) error("another error") ret..."]:1: '..
+           'another error'
+    })
+
+box.space._func.index[2]:delete("S1")
+box.space._func.index[2]:delete("S2")
+box.space._func.index[2]:delete("gh-2579-custom-aggregate.S3")
+box.space._func.index[2]:delete("S4")
+box.space._func.index[2]:delete("S10")
+box.space._func.index[2]:delete("S11")
+box.space.T:drop()
+
+test:finish_test()

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v1 1/1] sql: introduce user-defined aggregate functions
  2022-01-23 14:17   ` Mergen Imeev via Tarantool-patches
@ 2022-01-24 22:08     ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 0 replies; 4+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2022-01-24 22:08 UTC (permalink / raw)
  To: Mergen Imeev; +Cc: tarantool-patches

On 23.01.2022 15:17, Mergen Imeev wrote:
> Hi! Thank you for the review! Diff, new patch and a patch that adds a new
> utility function below.
> 
> On Tue, Jan 18, 2022 at 01:10:04AM +0100, Vladislav Shpilevoy wrote:
>> Hi! Thanks for the patch!
>>
>> What happened to the idea we had about SQL iterators? Then people would
>> be able to iterate with yields and to make any kind of aggregation without
>> having to create functions for that.
>>
> I think that this idea is actually very similar to the definition of CURSOR from
> the standard. Here's what the standard says about CURSORs:
> "A cursor is a mechanism by which the rows of a table may be acted on (e.g.,
> returned to a host programming language) one at a time."
> 
> I will try to suggest this idea.
> 
> However, current issue have quite strict deadline, which is actually quite near.
> I believe that CURSOR injection is not a problem that can be solved in a few
> weeks.
> 
> Also, about my current implementation. I don't like it, because there are too
> many changes made to BOX that are not related to BOX itself. I think there is a
> much simpler implementation. I think you noticed that the current implementation
> says that non-constant functions and C functions should define two
> implementations named "<name>" and "<name>_finalize". Do you think this rule can
> be made more general? I suggest creating two tuples in _func instead of one when
> we create an aggregate function using box.schema.func.create(). In this case,
> there will be almost no changes in alter.cc and BOX as a whole, and the logic
> will become much simpler. Also, I think we won't be adding new opcodes to the
> VDBE since we can just reuse OP_FunctionByName. That problem with this approach
> is that "<name>_finalize" will be occupied.

Another issue is that people will need to delete this func explicitly when they
drop the main one. That might be a blocker for this rework idea.

Plausible alternatives are:

1. Postpone aggregation until cursors are implemented. Move the deadline. Then the
aggregation might be not needed at all.

2. Allow to specify step and finalization functions in SQL query. For instance,

	-- SQL
	--
	SELECT AGGREGATE({STEP = AvgFunc, END = AvgEnd}, [a, b]) FROM t;

	-- Lua
	--
	function AvgFunc(ctx, a, b)
		if ctx == nil then
			return {s = a + b, n = 1}
		else
			ctx.s = ctx.s + a + b
			ctx.n = ctx.n + 1
		end
	end

	function AvgEnd(ctx)
		return ctx.s / ctx.n
	end


	box.schema.func.create('AvgFunc', ...)
	box.schema.func.create('AvgEnd', ...)

If you omit FIN, it is simply not called. The same with STEP. And later could
introduce START.

3. Force people to create '..._finalize' function. Until it is created by their
own hands, the aggregation will raise an error.

> Also, this could potentially lead to
> a different combination of STEP and FINALIZE functions, for example, STEP could
> be constant but FINALIZE could be non-persistent. However, to create such
> combinations, the user must insert tuples directly into _func. I am not sure,
> if this a feature or a bug. What do you think?

As for how the functions are implemented - I wouldn't care much. They must
exist and be callable, that could be the only requirement.

I didn't look at the new version of the patch yet. Only answering the main
question now.

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2022-01-24 22:08 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-01-14 13:38 [Tarantool-patches] [PATCH v1 1/1] sql: introduce user-defined aggregate functions Mergen Imeev via Tarantool-patches
2022-01-18  0:10 ` Vladislav Shpilevoy via Tarantool-patches
2022-01-23 14:17   ` Mergen Imeev via Tarantool-patches
2022-01-24 22:08     ` Vladislav Shpilevoy via Tarantool-patches

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox