From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 05/10] ddl: fix _space_sequence rollback Date: Wed, 3 Jul 2019 22:30:07 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: _space_sequence changes are not rolled back properly. Fix it keeping in mind that our ultimate goal is to implement transactional DDL, which implies that all changes to the schema should be done synchronously, i.e. on_replace, not on_commit. --- src/box/alter.cc | 138 +++++++++++++++++++++++++++++++-------------- test/box/sequence.result | 54 ++++++++++++++++++ test/box/sequence.test.lua | 19 +++++++ 3 files changed, 170 insertions(+), 41 deletions(-) diff --git a/src/box/alter.cc b/src/box/alter.cc index 7aeed834..89ab9c65 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -126,7 +126,8 @@ access_check_ddl(const char *name, uint32_t object_id, uint32_t owner_uid, */ static void index_def_check_sequence(struct index_def *index_def, uint32_t sequence_fieldno, - const char *sequence_path, const char *space_name) + const char *sequence_path, uint32_t sequence_path_len, + const char *space_name) { struct key_def *key_def = index_def->key_def; struct key_part *sequence_part = NULL; @@ -137,7 +138,7 @@ index_def_check_sequence(struct index_def *index_def, uint32_t sequence_fieldno, if ((part->path == NULL && sequence_path == NULL) || (part->path != NULL && sequence_path != NULL && json_path_cmp(part->path, part->path_len, - sequence_path, strlen(sequence_path), + sequence_path, sequence_path_len, TUPLE_INDEX_BASE) == 0)) { sequence_part = part; break; @@ -302,7 +303,10 @@ index_def_new_from_tuple(struct tuple *tuple, struct space *space) space_check_index_def_xc(space, index_def); if (index_def->iid == 0 && space->sequence != NULL) index_def_check_sequence(index_def, space->sequence_fieldno, - space->sequence_path, space_name(space)); + space->sequence_path, + space->sequence_path != NULL ? + strlen(space->sequence_path) : 0, + space_name(space)); index_def_guard.is_active = false; return index_def; } @@ -3484,12 +3488,83 @@ on_replace_dd_sequence_data(struct trigger * /* trigger */, void *event) } /** - * Run the triggers registered on commit of a change in _space. + * Extract field number and path from _space_sequence tuple. + * The path is allocated using malloc(). */ +static uint32_t +sequence_field_from_tuple(struct space *space, struct tuple *tuple, + char **path_ptr) +{ + struct index *pk = index_find_xc(space, 0); + struct key_part *part = &pk->def->key_def->parts[0]; + uint32_t fieldno = part->fieldno; + const char *path_raw = part->path; + uint32_t path_len = part->path_len; + + /* Sequence field was added in 2.2.1. */ + if (tuple_field_count(tuple) > BOX_SPACE_SEQUENCE_FIELD_FIELDNO) { + fieldno = tuple_field_u32_xc(tuple, + BOX_SPACE_SEQUENCE_FIELD_FIELDNO); + path_raw = tuple_field_str_xc(tuple, + BOX_SPACE_SEQUENCE_FIELD_PATH, &path_len); + if (path_len == 0) + path_raw = NULL; + } + index_def_check_sequence(pk->def, fieldno, path_raw, path_len, + space_name(space)); + char *path = NULL; + if (path_raw != NULL) { + path = (char *)malloc(path_len + 1); + if (path == NULL) + tnt_raise(OutOfMemory, path_len + 1, + "malloc", "sequence path"); + memcpy(path, path_raw, path_len); + path[path_len] = 0; + } + *path_ptr = path; + return fieldno; +} + +/** Attach a sequence to a space on rollback in _space_sequence. */ +static void +set_space_sequence(struct trigger *trigger, void * /* event */) +{ + struct tuple *tuple = (struct tuple *)trigger->data; + uint32_t space_id = tuple_field_u32_xc(tuple, + BOX_SPACE_SEQUENCE_FIELD_ID); + uint32_t sequence_id = tuple_field_u32_xc(tuple, + BOX_SPACE_SEQUENCE_FIELD_SEQUENCE_ID); + bool is_generated = tuple_field_bool_xc(tuple, + BOX_SPACE_SEQUENCE_FIELD_IS_GENERATED); + struct space *space = space_by_id(space_id); + assert(space != NULL); + struct sequence *seq = sequence_by_id(sequence_id); + assert(seq != NULL); + char *path; + uint32_t fieldno = sequence_field_from_tuple(space, tuple, &path); + seq->is_generated = is_generated; + space->sequence = seq; + space->sequence_fieldno = fieldno; + free(space->sequence_path); + space->sequence_path = path; + trigger_run_xc(&on_alter_space, space); +} + +/** Detach a sequence from a space on rollback in _space_sequence. */ static void -on_commit_dd_space_sequence(struct trigger *trigger, void * /* event */) +clear_space_sequence(struct trigger *trigger, void * /* event */) { - struct space *space = (struct space *) trigger->data; + struct tuple *tuple = (struct tuple *)trigger->data; + uint32_t space_id = tuple_field_u32_xc(tuple, + BOX_SPACE_SEQUENCE_FIELD_ID); + struct space *space = space_by_id(space_id); + assert(space != NULL); + assert(space->sequence != NULL); + space->sequence->is_generated = false; + space->sequence = NULL; + space->sequence_fieldno = 0; + free(space->sequence_path); + space->sequence_path = NULL; trigger_run_xc(&on_alter_space, space); } @@ -3537,65 +3612,46 @@ on_replace_dd_space_sequence(struct trigger * /* trigger */, void *event) access_check_ddl(space->def->name, space->def->id, space->def->uid, SC_SPACE, PRIV_A); - struct trigger *on_commit = - txn_alter_trigger_new(on_commit_dd_space_sequence, space); - txn_on_commit(txn, on_commit); - if (stmt->new_tuple != NULL) { /* INSERT, UPDATE */ - struct index *pk = index_find_xc(space, 0); - - /* Sequence field was added in 2.2.1. */ + char *sequence_path; uint32_t sequence_fieldno; - const char *sequence_path_raw; - uint32_t sequence_path_len; - if (tuple_field_count(tuple) > BOX_SPACE_SEQUENCE_FIELD_FIELDNO) { - sequence_fieldno = tuple_field_u32_xc(tuple, - BOX_SPACE_SEQUENCE_FIELD_FIELDNO); - sequence_path_raw = tuple_field_str_xc(tuple, - BOX_SPACE_SEQUENCE_FIELD_PATH, - &sequence_path_len); - } else { - struct key_part *part = &pk->def->key_def->parts[0]; - sequence_fieldno = part->fieldno; - sequence_path_raw = part->path; - sequence_path_len = part->path_len; - } - char *sequence_path = NULL; - if (sequence_path_len > 0) { - sequence_path = (char *)malloc(sequence_path_len + 1); - if (sequence_path == NULL) - tnt_raise(OutOfMemory, sequence_path_len + 1, - "malloc", "sequence path"); - memcpy(sequence_path, sequence_path_raw, - sequence_path_len); - sequence_path[sequence_path_len] = 0; - } + sequence_fieldno = sequence_field_from_tuple(space, tuple, + &sequence_path); auto sequence_path_guard = make_scoped_guard([=] { free(sequence_path); }); - - index_def_check_sequence(pk->def, sequence_fieldno, - sequence_path, - space_name(space)); if (seq->is_generated) { tnt_raise(ClientError, ER_ALTER_SPACE, space_name(space), "can not attach generated sequence"); } + struct trigger *on_rollback; + if (stmt->old_tuple != NULL) + on_rollback = txn_alter_trigger_new(set_space_sequence, + stmt->old_tuple); + else + on_rollback = txn_alter_trigger_new(clear_space_sequence, + stmt->new_tuple); seq->is_generated = is_generated; space->sequence = seq; space->sequence_fieldno = sequence_fieldno; free(space->sequence_path); space->sequence_path = sequence_path; sequence_path_guard.is_active = false; + txn_on_rollback(txn, on_rollback); } else { /* DELETE */ + struct trigger *on_rollback; + on_rollback = txn_alter_trigger_new(set_space_sequence, + stmt->old_tuple); assert(space->sequence == seq); seq->is_generated = false; space->sequence = NULL; space->sequence_fieldno = 0; free(space->sequence_path); space->sequence_path = NULL; + txn_on_rollback(txn, on_rollback); } + trigger_run_xc(&on_alter_space, space); } /* }}} sequence */ diff --git a/test/box/sequence.result b/test/box/sequence.result index 2c0c0a96..99eed0ec 100644 --- a/test/box/sequence.result +++ b/test/box/sequence.result @@ -2195,3 +2195,57 @@ box.sequence.test ~= nil box.sequence.test:drop() --- ... +-- +-- Check that changes to _space_sequence are rolled back properly. +-- +s = box.schema.space.create('test') +--- +... +_ = s:create_index('pk') +--- +... +sq = box.schema.sequence.create('test') +--- +... +box.begin() box.space._space_sequence:insert{s.id, sq.id, false, 0, ''} id = s.index.pk.sequence_id box.rollback() +--- +... +id == sq.id +--- +- true +... +s.index.pk.sequence_id == nil +--- +- true +... +s:insert{box.NULL} -- error +--- +- error: 'Tuple field 1 type does not match one required by operation: expected unsigned' +... +s.index.pk:alter{sequence = sq} +--- +... +box.begin() box.space._space_sequence:delete{s.id} id = s.index.pk.sequence_id box.rollback() +--- +... +id == nil +--- +- true +... +s.index.pk.sequence_id == sq.id +--- +- true +... +s:insert{box.NULL} -- ok +--- +- [1] +... +s.index.pk:alter{sequence = false} +--- +... +sq:drop() +--- +... +s:drop() +--- +... diff --git a/test/box/sequence.test.lua b/test/box/sequence.test.lua index 3c8140e8..9615c447 100644 --- a/test/box/sequence.test.lua +++ b/test/box/sequence.test.lua @@ -742,3 +742,22 @@ box.begin() box.space._sequence:delete{sq.id} sq = box.sequence.test box.rollbac sq == nil box.sequence.test ~= nil box.sequence.test:drop() + +-- +-- Check that changes to _space_sequence are rolled back properly. +-- +s = box.schema.space.create('test') +_ = s:create_index('pk') +sq = box.schema.sequence.create('test') +box.begin() box.space._space_sequence:insert{s.id, sq.id, false, 0, ''} id = s.index.pk.sequence_id box.rollback() +id == sq.id +s.index.pk.sequence_id == nil +s:insert{box.NULL} -- error +s.index.pk:alter{sequence = sq} +box.begin() box.space._space_sequence:delete{s.id} id = s.index.pk.sequence_id box.rollback() +id == nil +s.index.pk.sequence_id == sq.id +s:insert{box.NULL} -- ok +s.index.pk:alter{sequence = false} +sq:drop() +s:drop() -- 2.11.0