[tarantool-patches] [PATCH v1 5/6] sql: fix incomplete UPDATE on IdxInsert failured

Kirill Shcherbatov kshcherbatov at tarantool.org
Thu Nov 29 16:09:49 MSK 2018


Now the SQL UPDATE operation is a sequence of two operations -
removing the old data from the index and inserting new ones.
If, for some reason, the insertion of new data has not been done
for ON_REPLACE_IGNORE strategy, the old tuple is not restored.

We introduce a new argument p2 for OP_IdxReplace and OP_IdxInsert
opcodes containing label for VDBE error handler for the case of
insertion failure. Each update step makes a SAVEPOINT before
old tuple deletion and defines an error handler with ROLLBACK.

Need for #3691
---
 extra/mkopcodeh.sh      |  2 ++
 src/box/sql/insert.c    | 12 ++++++++----
 src/box/sql/sqliteInt.h |  7 ++++++-
 src/box/sql/update.c    | 31 ++++++++++++++++++++++++++++++-
 src/box/sql/vdbe.c      | 39 ++++++++++++++++++++++++++++++++++-----
 5 files changed, 80 insertions(+), 11 deletions(-)

diff --git a/extra/mkopcodeh.sh b/extra/mkopcodeh.sh
index 5f31f2b7d..ea2d6b695 100755
--- a/extra/mkopcodeh.sh
+++ b/extra/mkopcodeh.sh
@@ -150,6 +150,8 @@ while [ "$i" -lt "$nOp" ]; do
     eval "name=\$ARRAY_order_$i"
     case "$name" in
     # The following are the opcodes that are processed by resolveP2Values()
+    OP_IdxInsert   | \
+    OP_IdxReplace  | \
     OP_Savepoint   | \
     OP_Checkpoint  | \
     OP_JournalMode | \
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index b099178d7..4fc39c233 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -763,7 +763,7 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
 		fkey_emit_check(pParse, pTab, 0, regIns, 0);
 		vdbe_emit_insertion_completion(v, space, regIns + 1,
 					       pTab->def->field_count,
-					       on_error);
+					       on_error, 0);
 	}
 
 	/* Update the count of rows that are inserted
@@ -1076,10 +1076,13 @@ process_index:  ;
 void
 vdbe_emit_insertion_completion(struct Vdbe *v, struct space *space,
 			       int raw_data_reg, uint32_t tuple_len,
-			       enum on_conflict_action on_conflict)
+			       enum on_conflict_action on_conflict,
+			       int conflict_handler_label)
 {
 	assert(v != NULL);
 	u16 pik_flags = OPFLAG_NCHANGE;
+	if (conflict_handler_label != 0)
+		pik_flags |= OPFLAG_OE_HANDLER;
 	if (on_conflict == ON_CONFLICT_ACTION_IGNORE)
 		pik_flags |= OPFLAG_OE_IGNORE;
 	else if (on_conflict == ON_CONFLICT_ACTION_FAIL)
@@ -1088,8 +1091,9 @@ vdbe_emit_insertion_completion(struct Vdbe *v, struct space *space,
 		pik_flags |= OPFLAG_OE_ROLLBACK;
 	sqlite3VdbeAddOp3(v, OP_MakeRecord, raw_data_reg, tuple_len,
 			  raw_data_reg + tuple_len);
-	sqlite3VdbeAddOp1(v, OP_IdxInsert, raw_data_reg + tuple_len);
-	sqlite3VdbeChangeP4(v, -1, (char *)space, P4_SPACEPTR);
+	sqlite3VdbeAddOp4(v, OP_IdxInsert, raw_data_reg + tuple_len,
+			  conflict_handler_label, 0, (char *)space,
+			  P4_SPACEPTR);
 	sqlite3VdbeChangeP5(v, pik_flags);
 }
 
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index dbf58d967..9353798c8 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2829,6 +2829,8 @@ struct Parse {
 #define OPFLAG_OE_IGNORE    0x200	/* OP_IdxInsert: Ignore flag */
 #define OPFLAG_OE_FAIL      0x400	/* OP_IdxInsert: Fail flag */
 #define OPFLAG_OE_ROLLBACK  0x800	/* OP_IdxInsert: Rollback flag. */
+/** OP_IdxInsert: Interrupt handler is set flag. */
+#define OPFLAG_OE_HANDLER   0x1000
 #define OPFLAG_LENGTHARG     0x40	/* OP_Column only used for length() */
 #define OPFLAG_TYPEOFARG     0x80	/* OP_Column only used for typeof() */
 #define OPFLAG_SEEKEQ        0x02	/* OP_Open** cursor uses EQ seek only */
@@ -3869,11 +3871,14 @@ vdbe_emit_constraint_checks(struct Parse *parse_context,
  * @param raw_data_reg Register with raw data to insert.
  * @param tuple_len Number of registers to hold the tuple.
  * @param on_conflict On conflict action.
+ * @param conflict_handler_label Label with error handler for
+ *                               on_conflict IGNORE strategy.
  */
 void
 vdbe_emit_insertion_completion(struct Vdbe *v, struct space *space,
 			       int raw_data_reg, uint32_t tuple_len,
-			       enum on_conflict_action on_conflict);
+			       enum on_conflict_action on_conflict,
+			       int conflict_handler_label);
 
 void
 sql_set_multi_write(Parse *, bool);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index cc69e2f30..6c47e5a80 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -34,6 +34,7 @@
  * to handle UPDATE statements.
  */
 #include "sqliteInt.h"
+#include "vdbeInt.h"
 #include "box/session.h"
 #include "tarantoolInt.h"
 #include "box/schema.h"
@@ -94,6 +95,8 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 	int hasFK;		/* True if foreign key processing is required */
 	int labelBreak;		/* Jump here to break out of UPDATE loop */
 	int labelContinue;	/* Jump here to continue next step of UPDATE loop */
+	/* Jump here to rollback record on Tarantool error. */
+	int rollback_change = 0;
 	struct session *user_session = current_session();
 
 	bool is_view;		/* True when updating a view (INSTEAD OF trigger) */
@@ -112,6 +115,9 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 	int regNew = 0;		/* Content of the NEW.* table in triggers */
 	int regOld = 0;		/* Content of OLD.* table in triggers */
 	int regKey = 0;		/* composite PRIMARY KEY value */
+	/* Token to make savepoints for UPDATE step. */
+	struct Token rollback_token =
+		{.z = "UPDATE_ITEM", .n = strlen("UPDATE_ITEM")};
 
 	db = pParse->db;
 	if (pParse->nErr || db->mallocFailed) {
@@ -440,13 +446,19 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 		int addr1 = sqlite3VdbeAddOp4Int(v, OP_NotFound, pk_cursor, 0,
 						 regKey, nKey);
 		assert(regNew == regNewPk + 1);
+		/* Make savepoint for rollback OP_Delete */
+		if (!box_txn()) {
+			rollback_change = sqlite3VdbeMakeLabel(v);
+			sqlite3Savepoint(pParse, SAVEPOINT_BEGIN,
+					 &rollback_token);
+		}
 		sqlite3VdbeAddOp2(v, OP_Delete, pk_cursor, 0);
 		sqlite3VdbeJumpHere(v, addr1);
 		if (hasFK)
 			fkey_emit_check(pParse, pTab, 0, regNewPk, aXRef);
 		vdbe_emit_insertion_completion(v, space, regNew,
 					       pTab->def->field_count,
-					       on_error);
+					       on_error, rollback_change);
 		/*
 		 * Do any ON CASCADE, SET NULL or SET DEFAULT
 		 * operations required to handle rows that refer
@@ -467,6 +479,23 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 			      TRIGGER_AFTER, pTab, regOldPk, on_error,
 			      labelContinue);
 
+	/* OR IGNORE/REPLACE/ABORT error handler. */
+	if (rollback_change != 0) {
+		sqlite3VdbeAddOp2(v, OP_Goto, 0, labelContinue);
+		sqlite3VdbeResolveLabel(v, rollback_change);
+		struct Vdbe *temp =
+			sqlite3DbMallocRaw(db, sizeof(struct Vdbe));
+		if (unlikely(temp == NULL))
+			goto update_cleanup;
+		sqlite3VdbeAddOp4(v, OP_ErrorCtx, 0, 0, 0,
+				  (const char *)temp, P4_DYNAMIC);
+		sqlite3Savepoint(pParse, SAVEPOINT_ROLLBACK,
+				 &rollback_token);
+		if (on_error != ON_CONFLICT_ACTION_IGNORE) {
+			sqlite3VdbeAddOp4(v, OP_ErrorCtx, 1, 0, 0,
+					(const char *)temp, P4_STATIC);
+		}
+	}
 	/* Repeat the above with the next record to be updated, until
 	 * all record selected by the WHERE clause have been updated.
 	 */
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 8fdabc212..5efcbcf21 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -945,6 +945,27 @@ case OP_EndCoroutine: {           /* in1 */
 	break;
 }
 
+/* Opcode: ErrorCtx P1 * * P4 *
+ *
+ * Save VDBE error state in dummy uninitialized Vdbe object
+ * specified with P4. Use P1 = 0 for save and P1 = 1 for raise
+ * error context.
+ * Used to manage error outside of error handler.
+ */
+case OP_ErrorCtx: {
+	struct Vdbe *v = (struct Vdbe *)pOp->p4.p;
+	if (pOp->p1 == 0) {
+		v->rc = p->rc;
+		p->rc = SQLITE_OK;
+		v->errorAction = p->errorAction;
+	} else {
+		rc = v->rc;
+		pOp->p2 = v->errorAction;
+		goto abort_due_to_error;
+	}
+	break;
+}
+
 /* Opcode:  Yield P1 P2 * * *
  *
  * Swap the program counter with the value in register P1.  This
@@ -4392,10 +4413,12 @@ case OP_SorterInsert: {      /* in2 */
 	break;
 }
 
-/* Opcode: IdxInsert P1 * P3 P4 P5
+/* Opcode: IdxInsert P1 P2 P3 P4 P5
  * Synopsis: key=r[P1]
  *
  * @param P1 Index of a register with MessagePack data to insert.
+ * @param P2 The value to jump on error when OPFLAG_OE_HANDLER
+ *           flag is set.
  * @param P3 If P4 is not set, then P3 is register containing
  *           pointer to space to insert into.
  * @param P4 Pointer to the struct space to insert to.
@@ -4405,14 +4428,14 @@ case OP_SorterInsert: {      /* in2 */
  *        we are processing INSERT OR INGORE statement. Thus, in
  *        case of conflict we don't raise an error.
  */
-/* Opcode: IdxReplace2 P1 * P3 P4 P5
+/* Opcode: IdxReplace2 P1 P2 P3 P4 P5
  * Synopsis: key=r[P1]
  *
  * This opcode works exactly as IdxInsert does, but in Tarantool
  * internals it invokes box_replace() instead of box_insert().
  */
-case OP_IdxReplace:
-case OP_IdxInsert: {
+case OP_IdxReplace:  /* jump */
+case OP_IdxInsert: { /* jump */
 	pIn2 = &aMem[pOp->p1];
 	assert((pIn2->flags & MEM_Blob) != 0);
 	if (pOp->p5 & OPFLAG_NCHANGE)
@@ -4443,7 +4466,8 @@ case OP_IdxInsert: {
 
 	if (pOp->p5 & OPFLAG_OE_IGNORE) {
 		/* Ignore any kind of failes and do not raise error message */
-		rc = SQLITE_OK;
+		if ((pOp->p5 & OPFLAG_OE_HANDLER) == 0)
+			rc = SQLITE_OK;
 		/* If we are in trigger, increment ignore raised counter */
 		if (p->pFrame)
 			p->ignoreRaised++;
@@ -4452,6 +4476,11 @@ case OP_IdxInsert: {
 	} else if (pOp->p5 & OPFLAG_OE_ROLLBACK) {
 		p->errorAction = ON_CONFLICT_ACTION_ROLLBACK;
 	}
+	if (pOp->p5 & OPFLAG_OE_HANDLER && rc != SQLITE_OK) {
+		p->rc = rc;
+		rc = SQLITE_OK;
+		goto jump_to_p2;
+	}
 	if (rc != 0)
 		goto abort_due_to_error;
 	break;
-- 
2.19.2





More information about the Tarantool-patches mailing list