* [PATCH] vinyl: refactor vylog recovery
@ 2018-03-16 12:05 Vladimir Davydov
0 siblings, 0 replies; only message in thread
From: Vladimir Davydov @ 2018-03-16 12:05 UTC (permalink / raw)
To: tarantool-patches
The vy_recovery structure was initially designed as opaque to the
outside world - to iterate over objects stored in it, one is supposed to
use vy_recovery_iterate(), which invokes the given callback for each
recovered object encoded as vy_log_record that was used to create it.
Such a design gets extremely difficult to use when we need to preserve
some context between callback invocations - e.g. see how ugly backup and
garbage collection procedures look. And it is going to become even more
obfuscated once we introduce the notion of incomplete indexes (indexes
that are currently being built by ALTER).
So let's refactor vylog recovery procedure: let's make the vy_recovery
structure transparent and allow to iterate over internal representations
of recovered objects directly, without callbacks.
---
https://github.com/tarantool/tarantool/tree/vy-refactor-vylog-recovery
src/box/vinyl.c | 399 +++++++++++++++++++++--------------------------
src/box/vy_index.c | 375 ++++++++++++++++++++++----------------------
src/box/vy_log.c | 340 ++++++++++------------------------------
src/box/vy_log.h | 197 +++++++++++++++--------
test/unit/vy_log_stub.c | 8 +-
test/vinyl/layout.result | 4 +-
6 files changed, 582 insertions(+), 741 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index e0c30757..d3659b0b 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -3011,8 +3011,6 @@ struct vy_join_ctx {
struct cbus_call_msg cmsg;
/** ID of the space currently being relayed. */
uint32_t space_id;
- /** Ordinal number of the index. */
- uint32_t index_id;
/**
* Index key definition, as defined by the user.
* We only send the primary key, so the definition
@@ -3036,6 +3034,55 @@ struct vy_join_ctx {
struct rlist slices;
};
+/**
+ * Recover a slice and add it to the list of slices.
+ * Newer slices are supposed to be recovered first.
+ * Returns 0 on success, -1 on failure.
+ */
+static int
+vy_prepare_send_slice(struct vy_join_ctx *ctx,
+ struct vy_slice_recovery_info *slice_info)
+{
+ int rc = -1;
+ struct vy_run *run = NULL;
+ struct tuple *begin = NULL, *end = NULL;
+
+ run = vy_run_new(&ctx->env->run_env, slice_info->run->id);
+ if (run == NULL)
+ goto out;
+ if (vy_run_recover(run, ctx->env->path, ctx->space_id, 0) != 0)
+ goto out;
+
+ if (slice_info->begin != NULL) {
+ begin = vy_key_from_msgpack(ctx->env->index_env.key_format,
+ slice_info->begin);
+ if (begin == NULL)
+ goto out;
+ }
+ if (slice_info->end != NULL) {
+ end = vy_key_from_msgpack(ctx->env->index_env.key_format,
+ slice_info->end);
+ if (end == NULL)
+ goto out;
+ }
+
+ struct vy_slice *slice = vy_slice_new(slice_info->id, run,
+ begin, end, ctx->key_def);
+ if (slice == NULL)
+ goto out;
+
+ rlist_add_tail_entry(&ctx->slices, slice, in_join);
+ rc = 0;
+out:
+ if (run != NULL)
+ vy_run_unref(run);
+ if (begin != NULL)
+ tuple_unref(begin);
+ if (end != NULL)
+ tuple_unref(end);
+ return rc;
+}
+
static int
vy_send_range_f(struct cbus_call_msg *cmsg)
{
@@ -3068,28 +3115,38 @@ err:
return rc;
}
-/**
- * Merge and send all runs from the given relay context.
- * On success, delete runs.
- */
+/** Merge and send all runs of the given range. */
static int
-vy_send_range(struct vy_join_ctx *ctx)
+vy_send_range(struct vy_join_ctx *ctx,
+ struct vy_range_recovery_info *range_info)
{
- if (rlist_empty(&ctx->slices))
+ int rc;
+ struct vy_slice *slice, *tmp;
+
+ if (rlist_empty(&range_info->slices))
return 0; /* nothing to do */
- int rc = -1;
+ /* Recover slices. */
+ struct vy_slice_recovery_info *slice_info;
+ rlist_foreach_entry(slice_info, &range_info->slices, in_range) {
+ rc = vy_prepare_send_slice(ctx, slice_info);
+ if (rc != 0)
+ goto out_delete_slices;
+ }
+
+ /* Create a write iterator. */
struct rlist fake_read_views;
rlist_create(&fake_read_views);
ctx->wi = vy_write_iterator_new(ctx->key_def,
ctx->format, ctx->upsert_format,
true, true, &fake_read_views);
- if (ctx->wi == NULL)
+ if (ctx->wi == NULL) {
+ rc = -1;
goto out;
-
- struct vy_slice *slice;
+ }
rlist_foreach_entry(slice, &ctx->slices, in_join) {
- if (vy_write_iterator_new_slice(ctx->wi, slice) != 0)
+ rc = vy_write_iterator_new_slice(ctx->wi, slice);
+ if (rc != 0)
goto out_delete_wi;
}
@@ -3099,11 +3156,10 @@ vy_send_range(struct vy_join_ctx *ctx)
vy_send_range_f, NULL, TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
- struct vy_slice *tmp;
+out_delete_slices:
rlist_foreach_entry_safe(slice, &ctx->slices, in_join, tmp)
vy_slice_delete(slice);
rlist_create(&ctx->slices);
-
out_delete_wi:
ctx->wi->iface->close(ctx->wi);
ctx->wi = NULL;
@@ -3111,96 +3167,59 @@ out:
return rc;
}
-/** Relay callback, passed to vy_recovery_iterate(). */
+/** Send all tuples stored in the given index. */
static int
-vy_join_cb(const struct vy_log_record *record, void *arg)
+vy_send_index(struct vy_join_ctx *ctx,
+ struct vy_index_recovery_info *index_info)
{
- struct vy_join_ctx *ctx = arg;
-
- if (record->type == VY_LOG_CREATE_INDEX ||
- record->type == VY_LOG_INSERT_RANGE) {
- /*
- * All runs of the current range have been recovered,
- * so send them to the replica.
- */
- if (vy_send_range(ctx) != 0)
- return -1;
- }
+ int rc = -1;
- if (record->type == VY_LOG_CREATE_INDEX) {
- ctx->space_id = record->space_id;
- ctx->index_id = record->index_id;
- if (ctx->key_def != NULL)
- free(ctx->key_def);
- ctx->key_def = key_def_new_with_parts(record->key_parts,
- record->key_part_count);
- if (ctx->key_def == NULL)
- return -1;
- if (ctx->format != NULL)
- tuple_format_unref(ctx->format);
- ctx->format = tuple_format_new(&vy_tuple_format_vtab,
- &ctx->key_def, 1, 0, NULL, 0,
- NULL);
- if (ctx->format == NULL)
- return -1;
- tuple_format_ref(ctx->format);
- if (ctx->upsert_format != NULL)
- tuple_format_unref(ctx->upsert_format);
- ctx->upsert_format = vy_tuple_format_new_upsert(ctx->format);
- if (ctx->upsert_format == NULL)
- return -1;
- tuple_format_ref(ctx->upsert_format);
- }
+ if (index_info->is_dropped)
+ return 0;
/*
* We are only interested in the primary index.
* Secondary keys will be rebuilt on the destination.
*/
- if (ctx->index_id != 0)
+ if (index_info->index_id != 0)
return 0;
- if (record->type == VY_LOG_INSERT_SLICE) {
- struct tuple_format *key_format = ctx->env->index_env.key_format;
- struct tuple *begin = NULL, *end = NULL;
- bool success = false;
-
- struct vy_run *run = vy_run_new(&ctx->env->run_env,
- record->run_id);
- if (run == NULL)
- goto done_slice;
- if (vy_run_recover(run, ctx->env->path,
- ctx->space_id, ctx->index_id) != 0)
- goto done_slice;
-
- if (record->begin != NULL) {
- begin = vy_key_from_msgpack(key_format, record->begin);
- if (begin == NULL)
- goto done_slice;
- }
- if (record->end != NULL) {
- end = vy_key_from_msgpack(key_format, record->end);
- if (end == NULL)
- goto done_slice;
- }
+ ctx->space_id = index_info->space_id;
- struct vy_slice *slice = vy_slice_new(record->slice_id,
- run, begin, end, ctx->key_def);
- if (slice == NULL)
- goto done_slice;
-
- rlist_add_entry(&ctx->slices, slice, in_join);
- success = true;
-done_slice:
- if (run != NULL)
- vy_run_unref(run);
- if (begin != NULL)
- tuple_unref(begin);
- if (end != NULL)
- tuple_unref(end);
- if (!success)
- return -1;
+ /* Create key definition and tuple format. */
+ ctx->key_def = key_def_new_with_parts(index_info->key_parts,
+ index_info->key_part_count);
+ if (ctx->key_def == NULL)
+ goto out;
+ ctx->format = tuple_format_new(&vy_tuple_format_vtab, &ctx->key_def,
+ 1, 0, NULL, 0, NULL);
+ if (ctx->format == NULL)
+ goto out_free_key_def;
+ tuple_format_ref(ctx->format);
+ ctx->upsert_format = vy_tuple_format_new_upsert(ctx->format);
+ if (ctx->upsert_format == NULL)
+ goto out_free_format;
+ tuple_format_ref(ctx->upsert_format);
+
+ /* Send ranges. */
+ struct vy_range_recovery_info *range_info;
+ assert(!rlist_empty(&index_info->ranges));
+ rlist_foreach_entry(range_info, &index_info->ranges, in_index) {
+ rc = vy_send_range(ctx, range_info);
+ if (rc != 0)
+ break;
}
- return 0;
+
+ tuple_format_unref(ctx->upsert_format);
+ ctx->upsert_format = NULL;
+out_free_format:
+ tuple_format_unref(ctx->format);
+ ctx->format = NULL;
+out_free_key_def:
+ free(ctx->key_def);
+ ctx->key_def = NULL;
+out:
+ return rc;
}
/** Relay cord function. */
@@ -3260,22 +3279,15 @@ vinyl_engine_join(struct engine *engine, struct vclock *vclock,
say_error("failed to recover vylog to join a replica");
goto out_join_cord;
}
- rc = vy_recovery_iterate(recovery, vy_join_cb, ctx);
+ rc = 0;
+ struct vy_index_recovery_info *index_info;
+ rlist_foreach_entry(index_info, &recovery->indexes, in_recovery) {
+ rc = vy_send_index(ctx, index_info);
+ if (rc != 0)
+ break;
+ }
vy_recovery_delete(recovery);
- /* Send the last range. */
- if (rc == 0)
- rc = vy_send_range(ctx);
-
- /* Cleanup. */
- if (ctx->key_def != NULL)
- free(ctx->key_def);
- if (ctx->format != NULL)
- tuple_format_unref(ctx->format);
- if (ctx->upsert_format != NULL)
- tuple_format_unref(ctx->upsert_format);
- struct vy_slice *slice, *tmp;
- rlist_foreach_entry_safe(slice, &ctx->slices, in_join, tmp)
- vy_slice_delete(slice);
+
out_join_cord:
cbus_stop_loop(&ctx->relay_pipe);
cpipe_destroy(&ctx->relay_pipe);
@@ -3355,70 +3367,29 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
/* {{{ Garbage collection */
-/** Argument passed to vy_gc_cb(). */
-struct vy_gc_arg {
- /** Vinyl environment. */
- struct vy_env *env;
- /**
- * Specifies what kinds of runs to delete.
- * See VY_GC_*.
- */
- unsigned int gc_mask;
- /** LSN of the oldest checkpoint to save. */
- int64_t gc_lsn;
- /**
- * ID of the current space and index.
- * Needed for file name formatting.
- */
- uint32_t space_id;
- uint32_t index_id;
- /** Number of times the callback has been called. */
- int loops;
-};
-
/**
- * Garbage collection callback, passed to vy_recovery_iterate().
- *
* Given a record encoding information about a vinyl run, try to
* delete the corresponding files. On success, write a "forget" record
* to the log so that all information about the run is deleted on the
* next log rotation.
*/
-static int
-vy_gc_cb(const struct vy_log_record *record, void *cb_arg)
+static void
+vy_gc_run(struct vy_env *env,
+ struct vy_index_recovery_info *index_info,
+ struct vy_run_recovery_info *run_info)
{
- struct vy_gc_arg *arg = cb_arg;
-
- switch (record->type) {
- case VY_LOG_CREATE_INDEX:
- arg->space_id = record->space_id;
- arg->index_id = record->index_id;
- goto out;
- case VY_LOG_PREPARE_RUN:
- if ((arg->gc_mask & VY_GC_INCOMPLETE) == 0)
- goto out;
- break;
- case VY_LOG_DROP_RUN:
- if ((arg->gc_mask & VY_GC_DROPPED) == 0 ||
- record->gc_lsn >= arg->gc_lsn)
- goto out;
- break;
- default:
- goto out;
- }
-
ERROR_INJECT(ERRINJ_VY_GC,
{say_error("error injection: vinyl run %lld not deleted",
- (long long)record->run_id); goto out;});
+ (long long)run_info->id); return;});
/* Try to delete files. */
- if (vy_run_remove_files(arg->env->path, arg->space_id,
- arg->index_id, record->run_id) != 0)
- goto out;
+ if (vy_run_remove_files(env->path, index_info->space_id,
+ index_info->index_id, run_info->id) != 0)
+ return;
/* Forget the run on success. */
vy_log_tx_begin();
- vy_log_forget_run(record->run_id);
+ vy_log_forget_run(run_info->id);
/*
* Leave the record in the vylog buffer on disk error.
* If we fail to flush it before restart, we will retry
@@ -3426,23 +3397,35 @@ vy_gc_cb(const struct vy_log_record *record, void *cb_arg)
* is invoked, which is harmless.
*/
vy_log_tx_try_commit();
-out:
- if (++arg->loops % VY_YIELD_LOOPS == 0)
- fiber_sleep(0);
- return 0;
}
-/** Delete unused run files, see vy_gc_arg for more details. */
+/**
+ * Delete unused run files stored in the recovery context.
+ * @param env Vinyl environment.
+ * @param recovery Recovery context.
+ * @param gc_mask Specifies what kinds of runs to delete (see VY_GC_*).
+ * @param gc_lsn LSN of the oldest checkpoint to save.
+ */
static void
vy_gc(struct vy_env *env, struct vy_recovery *recovery,
unsigned int gc_mask, int64_t gc_lsn)
{
- struct vy_gc_arg arg = {
- .env = env,
- .gc_mask = gc_mask,
- .gc_lsn = gc_lsn,
- };
- vy_recovery_iterate(recovery, vy_gc_cb, &arg);
+ int loops = 0;
+ struct vy_index_recovery_info *index_info;
+ rlist_foreach_entry(index_info, &recovery->indexes, in_recovery) {
+ struct vy_run_recovery_info *run_info;
+ rlist_foreach_entry(run_info, &index_info->runs, in_index) {
+ if ((run_info->is_dropped &&
+ run_info->gc_lsn < gc_lsn &&
+ (gc_mask & VY_GC_DROPPED) != 0) ||
+ (run_info->is_incomplete &&
+ (gc_mask & VY_GC_INCOMPLETE) != 0)) {
+ vy_gc_run(env, index_info, run_info);
+ }
+ if (loops % VY_YIELD_LOOPS == 0)
+ fiber_sleep(0);
+ }
+ }
}
static int
@@ -3469,52 +3452,6 @@ vinyl_engine_collect_garbage(struct engine *engine, int64_t lsn)
/* {{{ Backup */
-/** Argument passed to vy_backup_cb(). */
-struct vy_backup_arg {
- /** Vinyl environment. */
- struct vy_env *env;
- /** Backup callback. */
- int (*cb)(const char *, void *);
- /** Argument passed to @cb. */
- void *cb_arg;
- /**
- * ID of the current space and index.
- * Needed for file name formatting.
- */
- uint32_t space_id;
- uint32_t index_id;
- /** Number of times the callback has been called. */
- int loops;
-};
-
-/** Backup callback, passed to vy_recovery_iterate(). */
-static int
-vy_backup_cb(const struct vy_log_record *record, void *cb_arg)
-{
- struct vy_backup_arg *arg = cb_arg;
-
- if (record->type == VY_LOG_CREATE_INDEX) {
- arg->space_id = record->space_id;
- arg->index_id = record->index_id;
- }
-
- if (record->type != VY_LOG_CREATE_RUN || record->is_dropped)
- goto out;
-
- char path[PATH_MAX];
- for (int type = 0; type < vy_file_MAX; type++) {
- vy_run_snprint_path(path, sizeof(path), arg->env->path,
- arg->space_id, arg->index_id,
- record->run_id, type);
- if (arg->cb(path, arg->cb_arg) != 0)
- return -1;
- }
-out:
- if (++arg->loops % VY_YIELD_LOOPS == 0)
- fiber_sleep(0);
- return 0;
-}
-
static int
vinyl_engine_backup(struct engine *engine, struct vclock *vclock,
engine_backup_cb cb, void *cb_arg)
@@ -3535,12 +3472,32 @@ vinyl_engine_backup(struct engine *engine, struct vclock *vclock,
say_error("failed to recover vylog for backup");
return -1;
}
- struct vy_backup_arg arg = {
- .env = env,
- .cb = cb,
- .cb_arg = cb_arg,
- };
- int rc = vy_recovery_iterate(recovery, vy_backup_cb, &arg);
+ int rc = 0;
+ int loops = 0;
+ struct vy_index_recovery_info *index_info;
+ rlist_foreach_entry(index_info, &recovery->indexes, in_recovery) {
+ if (index_info->is_dropped)
+ continue;
+ struct vy_run_recovery_info *run_info;
+ rlist_foreach_entry(run_info, &index_info->runs, in_index) {
+ if (run_info->is_dropped || run_info->is_incomplete)
+ continue;
+ char path[PATH_MAX];
+ for (int type = 0; type < vy_file_MAX; type++) {
+ vy_run_snprint_path(path, sizeof(path),
+ env->path,
+ index_info->space_id,
+ index_info->index_id,
+ run_info->id, type);
+ rc = cb(path, cb_arg);
+ if (rc != 0)
+ goto out;
+ }
+ if (loops % VY_YIELD_LOOPS == 0)
+ fiber_sleep(0);
+ }
+ }
+out:
vy_recovery_delete(recovery);
return rc;
}
diff --git a/src/box/vy_index.c b/src/box/vy_index.c
index 68fccab5..9c199ddd 100644
--- a/src/box/vy_index.c
+++ b/src/box/vy_index.c
@@ -36,7 +36,6 @@
#include <sys/stat.h>
#include <sys/types.h>
-#include "assoc.h"
#include "diag.h"
#include "errcode.h"
#include "histogram.h"
@@ -386,156 +385,150 @@ vy_index_create(struct vy_index *index)
return vy_index_init_range_tree(index);
}
-/** vy_index_recovery_cb() argument. */
-struct vy_index_recovery_cb_arg {
- /** Index being recovered. */
- struct vy_index *index;
- /** Last recovered range. */
- struct vy_range *range;
- /** Vinyl run environment. */
- struct vy_run_env *run_env;
- /**
- * All recovered runs hashed by ID.
- * It is needed in order not to load the same
- * run each time a slice is created for it.
- */
- struct mh_i64ptr_t *run_hash;
- /**
- * True if force_recovery mode is enabled.
+static struct vy_run *
+vy_index_recover_run(struct vy_index *index,
+ struct vy_run_recovery_info *run_info,
+ struct vy_run_env *run_env, bool force_recovery)
+{
+ assert(!run_info->is_dropped);
+ assert(!run_info->is_incomplete);
+
+ if (run_info->data != NULL) {
+ /* Already recovered. */
+ return run_info->data;
+ }
+
+ struct vy_run *run = vy_run_new(run_env, run_info->id);
+ if (run == NULL)
+ return NULL;
+
+ run->dump_lsn = run_info->dump_lsn;
+ if (vy_run_recover(run, index->env->path,
+ index->space_id, index->id) != 0 &&
+ (!force_recovery ||
+ vy_run_rebuild_index(run, index->env->path,
+ index->space_id, index->id,
+ index->cmp_def, index->key_def,
+ index->mem_format, index->upsert_format,
+ &index->opts) != 0)) {
+ vy_run_unref(run);
+ return NULL;
+ }
+ vy_index_add_run(index, run);
+
+ /*
+ * The same run can be referenced by more than one slice
+ * so we cache recovered runs in run_info to avoid loading
+ * the same run multiple times.
+ *
+ * Runs are stored with their reference counters elevated.
+ * We drop the extra references as soon as index recovery
+ * is complete (see vy_index_recover()).
*/
- bool force_recovery;
-};
+ run_info->data = run;
+ return run;
+}
-/** Index recovery callback, passed to vy_recovery_load_index(). */
-static int
-vy_index_recovery_cb(const struct vy_log_record *record, void *cb_arg)
+static struct vy_slice *
+vy_index_recover_slice(struct vy_index *index, struct vy_range *range,
+ struct vy_slice_recovery_info *slice_info,
+ struct vy_run_env *run_env, bool force_recovery)
{
- struct vy_index_recovery_cb_arg *arg = cb_arg;
- struct vy_index *index = arg->index;
- struct vy_range *range = arg->range;
- struct vy_run_env *run_env = arg->run_env;
- struct mh_i64ptr_t *run_hash = arg->run_hash;
- bool force_recovery = arg->force_recovery;
- struct tuple_format *key_format = index->env->key_format;
struct tuple *begin = NULL, *end = NULL;
+ struct vy_slice *slice = NULL;
struct vy_run *run;
- struct vy_slice *slice;
- bool success = false;
-
- assert(record->type == VY_LOG_CREATE_INDEX || index->commit_lsn >= 0);
- if (record->type == VY_LOG_INSERT_RANGE ||
- record->type == VY_LOG_INSERT_SLICE) {
- if (record->begin != NULL) {
- begin = vy_key_from_msgpack(key_format, record->begin);
- if (begin == NULL)
- goto out;
- }
- if (record->end != NULL) {
- end = vy_key_from_msgpack(key_format, record->end);
- if (end == NULL)
- goto out;
- }
- }
-
- switch (record->type) {
- case VY_LOG_CREATE_INDEX:
- assert(record->index_id == index->id);
- assert(record->space_id == index->space_id);
- assert(index->commit_lsn < 0);
- assert(record->index_lsn >= 0);
- index->commit_lsn = record->index_lsn;
- break;
- case VY_LOG_DUMP_INDEX:
- assert(record->index_lsn == index->commit_lsn);
- index->dump_lsn = record->dump_lsn;
- break;
- case VY_LOG_TRUNCATE_INDEX:
- assert(record->index_lsn == index->commit_lsn);
- index->truncate_count = record->truncate_count;
- break;
- case VY_LOG_DROP_INDEX:
- assert(record->index_lsn == index->commit_lsn);
- index->is_dropped = true;
- /*
- * If the index was dropped, we don't need to replay
- * truncate (see vy_prepare_truncate_space()).
- */
- index->truncate_count = UINT64_MAX;
- break;
- case VY_LOG_PREPARE_RUN:
- break;
- case VY_LOG_CREATE_RUN:
- if (record->is_dropped)
- break;
- assert(record->index_lsn == index->commit_lsn);
- run = vy_run_new(run_env, record->run_id);
- if (run == NULL)
+ if (slice_info->begin != NULL) {
+ begin = vy_key_from_msgpack(index->env->key_format,
+ slice_info->begin);
+ if (begin == NULL)
goto out;
- run->dump_lsn = record->dump_lsn;
- if (vy_run_recover(run, index->env->path,
- index->space_id, index->id) != 0 &&
- (!force_recovery ||
- vy_run_rebuild_index(run, index->env->path,
- index->space_id, index->id,
- index->cmp_def, index->key_def,
- index->mem_format,
- index->upsert_format,
- &index->opts) != 0)) {
- vy_run_unref(run);
+ }
+ if (slice_info->end != NULL) {
+ end = vy_key_from_msgpack(index->env->key_format,
+ slice_info->end);
+ if (end == NULL)
goto out;
- }
- struct mh_i64ptr_node_t node = { run->id, run };
- if (mh_i64ptr_put(run_hash, &node,
- NULL, NULL) == mh_end(run_hash)) {
- diag_set(OutOfMemory, 0,
- "mh_i64ptr_put", "mh_i64ptr_node_t");
- vy_run_unref(run);
+ }
+ if (begin != NULL && end != NULL &&
+ vy_key_compare(begin, end, index->cmp_def) >= 0) {
+ diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+ tt_sprintf("begin >= end for slice %lld",
+ (long long)slice_info->id));
+ goto out;
+ }
+
+ run = vy_index_recover_run(index, slice_info->run,
+ run_env, force_recovery);
+ if (run == NULL)
+ goto out;
+
+ slice = vy_slice_new(slice_info->id, run, begin, end, index->cmp_def);
+ if (slice == NULL)
+ goto out;
+
+ vy_range_add_slice(range, slice);
+out:
+ if (begin != NULL)
+ tuple_unref(begin);
+ if (end != NULL)
+ tuple_unref(end);
+ return slice;
+}
+
+static struct vy_range *
+vy_index_recover_range(struct vy_index *index,
+ struct vy_range_recovery_info *range_info,
+ struct vy_run_env *run_env, bool force_recovery)
+{
+ struct tuple *begin = NULL, *end = NULL;
+ struct vy_range *range = NULL;
+
+ if (range_info->begin != NULL) {
+ begin = vy_key_from_msgpack(index->env->key_format,
+ range_info->begin);
+ if (begin == NULL)
goto out;
- }
- break;
- case VY_LOG_DROP_RUN:
- break;
- case VY_LOG_INSERT_RANGE:
- assert(record->index_lsn == index->commit_lsn);
- range = vy_range_new(record->range_id, begin, end,
- index->cmp_def);
- if (range == NULL)
+ }
+ if (range_info->end != NULL) {
+ end = vy_key_from_msgpack(index->env->key_format,
+ range_info->end);
+ if (end == NULL)
goto out;
- if (range->begin != NULL && range->end != NULL &&
- vy_key_compare(range->begin, range->end,
- index->cmp_def) >= 0) {
- diag_set(ClientError, ER_INVALID_VYLOG_FILE,
- tt_sprintf("begin >= end for range id %lld",
- (long long)range->id));
+ }
+ if (begin != NULL && end != NULL &&
+ vy_key_compare(begin, end, index->cmp_def) >= 0) {
+ diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+ tt_sprintf("begin >= end for range %lld",
+ (long long)range_info->id));
+ goto out;
+ }
+
+ range = vy_range_new(range_info->id, begin, end, index->cmp_def);
+ if (range == NULL)
+ goto out;
+
+ /*
+ * Newer slices are stored closer to the head of the list,
+ * while we are supposed to add slices in chronological
+ * order, so use reverse iterator.
+ */
+ struct vy_slice_recovery_info *slice_info;
+ rlist_foreach_entry_reverse(slice_info, &range_info->slices, in_range) {
+ if (vy_index_recover_slice(index, range, slice_info,
+ run_env, force_recovery) == NULL) {
vy_range_delete(range);
+ range = NULL;
goto out;
}
- vy_index_add_range(index, range);
- arg->range = range;
- break;
- case VY_LOG_INSERT_SLICE:
- assert(range != NULL);
- assert(range->id == record->range_id);
- mh_int_t k = mh_i64ptr_find(run_hash, record->run_id, NULL);
- assert(k != mh_end(run_hash));
- run = mh_i64ptr_node(run_hash, k)->val;
- slice = vy_slice_new(record->slice_id, run, begin, end,
- index->cmp_def);
- if (slice == NULL)
- goto out;
- vy_range_add_slice(range, slice);
- break;
- default:
- unreachable();
}
- success = true;
+ vy_index_add_range(index, range);
out:
if (begin != NULL)
tuple_unref(begin);
if (end != NULL)
tuple_unref(end);
- return success ? 0 : -1;
+ return range;
}
int
@@ -545,19 +538,6 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
{
assert(index->range_count == 0);
- struct vy_index_recovery_cb_arg arg = {
- .index = index,
- .range = NULL,
- .run_env = run_env,
- .run_hash = NULL,
- .force_recovery = force_recovery,
- };
- arg.run_hash = mh_i64ptr_new();
- if (arg.run_hash == NULL) {
- diag_set(OutOfMemory, 0, "mh_i64ptr_new", "mh_i64ptr_t");
- return -1;
- }
-
/*
* Backward compatibility fixup: historically, we used
* box.info.signature for LSN of index creation, which
@@ -568,39 +548,14 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
if (index->opts.lsn != 0)
lsn = index->opts.lsn;
- int rc = vy_recovery_load_index(recovery, index->space_id, index->id,
- lsn, is_checkpoint_recovery,
- vy_index_recovery_cb, &arg);
-
- mh_int_t k;
- mh_foreach(arg.run_hash, k) {
- struct vy_run *run = mh_i64ptr_node(arg.run_hash, k)->val;
- if (run->refs > 1)
- vy_index_add_run(index, run);
- if (run->refs == 1 && rc == 0) {
- diag_set(ClientError, ER_INVALID_VYLOG_FILE,
- tt_sprintf("Unused run %lld in index %lld",
- (long long)run->id,
- (long long)index->commit_lsn));
- rc = -1;
- /*
- * Continue the loop to unreference
- * all runs in the hash.
- */
- }
- /* Drop the reference held by the hash. */
- vy_run_unref(run);
- }
- mh_i64ptr_delete(arg.run_hash);
-
- if (rc != 0) {
- /* Recovery callback failed. */
- return -1;
- }
-
- if (index->commit_lsn < 0) {
- /* Index was not found in the metadata log. */
- if (is_checkpoint_recovery) {
+ /*
+ * Look up the last incarnation of the index in vylog.
+ */
+ struct vy_index_recovery_info *index_info;
+ index_info = vy_recovery_lookup_index(recovery,
+ index->space_id, index->id);
+ if (is_checkpoint_recovery) {
+ if (index_info == NULL) {
/*
* All indexes created from snapshot rows must
* be present in vylog, because snapshot can
@@ -608,10 +563,21 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
* flushed.
*/
diag_set(ClientError, ER_INVALID_VYLOG_FILE,
- tt_sprintf("Index %lld not found",
- (long long)index->commit_lsn));
+ tt_sprintf("Index %u/%u not found",
+ (unsigned)index->space_id,
+ (unsigned)index->id));
return -1;
}
+ if (lsn > index_info->index_lsn) {
+ /*
+ * The last incarnation of the index was created
+ * before the last checkpoint, load it now.
+ */
+ lsn = index_info->index_lsn;
+ }
+ }
+
+ if (index_info == NULL || lsn > index_info->index_lsn) {
/*
* If we failed to log index creation before restart,
* we won't find it in the log on recovery. This is
@@ -622,15 +588,58 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
return vy_index_init_range_tree(index);
}
- if (index->is_dropped) {
+ index->commit_lsn = lsn;
+
+ if (lsn < index_info->index_lsn || index_info->is_dropped) {
/*
- * Initial range is not stored in the metadata log
- * for dropped indexes, but we need it for recovery.
+ * Loading a past incarnation of the index, i.e.
+ * the index is going to dropped during final
+ * recovery. Mark it as such.
+ */
+ index->is_dropped = true;
+ /*
+ * If the index was dropped, we don't need to replay
+ * truncate (see vinyl_space_prepare_truncate()).
+ */
+ index->truncate_count = UINT64_MAX;
+ /*
+ * We need range tree initialized for all indexes,
+ * even for dropped ones.
*/
return vy_index_init_range_tree(index);
}
/*
+ * Loading the last incarnation of the index from vylog.
+ */
+ index->dump_lsn = index_info->dump_lsn;
+ index->truncate_count = index_info->truncate_count;
+
+ int rc = 0;
+ struct vy_range_recovery_info *range_info;
+ rlist_foreach_entry(range_info, &index_info->ranges, in_index) {
+ if (vy_index_recover_range(index, range_info,
+ run_env, force_recovery) == NULL) {
+ rc = -1;
+ break;
+ }
+ }
+
+ /*
+ * vy_index_recover_run() elevates reference counter
+ * of each recovered run. We need to drop the extra
+ * references once we are done.
+ */
+ struct vy_run *run;
+ rlist_foreach_entry(run, &index->runs, in_index) {
+ assert(run->refs > 1);
+ vy_run_unref(run);
+ }
+
+ if (rc != 0)
+ return -1;
+
+ /*
* Account ranges to the index and check that the range tree
* does not have holes or overlaps.
*/
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index c31a588e..8b95282b 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -115,8 +115,6 @@ static const char *vy_log_type_name[] = {
[VY_LOG_TRUNCATE_INDEX] = "truncate_index",
};
-struct vy_recovery;
-
/** Metadata log object. */
struct vy_log {
/**
@@ -170,111 +168,6 @@ struct vy_log {
};
static struct vy_log vy_log;
-/** Recovery context. */
-struct vy_recovery {
- /** space_id, index_id -> vy_index_recovery_info. */
- struct mh_i64ptr_t *index_id_hash;
- /** index_lsn -> vy_index_recovery_info. */
- struct mh_i64ptr_t *index_lsn_hash;
- /** ID -> vy_range_recovery_info. */
- struct mh_i64ptr_t *range_hash;
- /** ID -> vy_run_recovery_info. */
- struct mh_i64ptr_t *run_hash;
- /** ID -> vy_slice_recovery_info. */
- struct mh_i64ptr_t *slice_hash;
- /**
- * Maximal vinyl object ID, according to the metadata log,
- * or -1 in case no vinyl objects were recovered.
- */
- int64_t max_id;
-};
-
-/** Vinyl index info stored in a recovery context. */
-struct vy_index_recovery_info {
- /** LSN of the index creation. */
- int64_t index_lsn;
- /** Ordinal index number in the space. */
- uint32_t index_id;
- /** Space ID. */
- uint32_t space_id;
- /** Array of key part definitions. */
- struct key_part_def *key_parts;
- /** Number of key parts. */
- uint32_t key_part_count;
- /** True if the index was dropped. */
- bool is_dropped;
- /** LSN of the last index dump. */
- int64_t dump_lsn;
- /** Truncate count. */
- int64_t truncate_count;
- /**
- * List of all ranges in the index, linked by
- * vy_range_recovery_info::in_index.
- */
- struct rlist ranges;
- /**
- * List of all runs created for the index
- * (both committed and not), linked by
- * vy_run_recovery_info::in_index.
- */
- struct rlist runs;
-};
-
-/** Vinyl range info stored in a recovery context. */
-struct vy_range_recovery_info {
- /** Link in vy_index_recovery_info::ranges. */
- struct rlist in_index;
- /** ID of the range. */
- int64_t id;
- /** Start of the range, stored in MsgPack array. */
- char *begin;
- /** End of the range, stored in MsgPack array. */
- char *end;
- /**
- * List of all slices in the range, linked by
- * vy_slice_recovery_info::in_range.
- *
- * Newer slices are closer to the head.
- */
- struct rlist slices;
-};
-
-/** Run info stored in a recovery context. */
-struct vy_run_recovery_info {
- /** Link in vy_index_recovery_info::runs. */
- struct rlist in_index;
- /** ID of the run. */
- int64_t id;
- /** Max LSN stored on disk. */
- int64_t dump_lsn;
- /**
- * For deleted runs: LSN of the last checkpoint
- * that uses this run.
- */
- int64_t gc_lsn;
- /**
- * True if the run was not committed (there's
- * VY_LOG_PREPARE_RUN, but no VY_LOG_CREATE_RUN).
- */
- bool is_incomplete;
- /** True if the run was dropped (VY_LOG_DROP_RUN). */
- bool is_dropped;
-};
-
-/** Slice info stored in a recovery context. */
-struct vy_slice_recovery_info {
- /** Link in vy_range_recovery_info::slices. */
- struct rlist in_range;
- /** ID of the slice. */
- int64_t id;
- /** Run this slice was created for. */
- struct vy_run_recovery_info *run;
- /** Start of the slice, stored in MsgPack array. */
- char *begin;
- /** End of the slice, stored in MsgPack array. */
- char *end;
-};
-
static struct vy_recovery *
vy_recovery_new_locked(int64_t signature, bool only_checkpoint);
@@ -977,91 +870,6 @@ vy_log_end_recovery(void)
return 0;
}
-/** Argument passed to vy_log_rotate_cb_func(). */
-struct vy_log_rotate_cb_arg {
- struct xdir *dir;
- struct xlog *xlog;
- const struct vclock *vclock;
-};
-
-/** Callback passed to vy_recovery_iterate() for log rotation. */
-static int
-vy_log_rotate_cb_func(const struct vy_log_record *record, void *cb_arg)
-{
- struct vy_log_rotate_cb_arg *arg = cb_arg;
- struct xlog *xlog = arg->xlog;
- struct xrow_header row;
-
- say_verbose("save vylog record: %s", vy_log_record_str(record));
-
- /* Create the log file on the first write. */
- if (!xlog_is_open(xlog) &&
- xdir_create_xlog(arg->dir, xlog, arg->vclock) < 0)
- return -1;
-
- if (vy_log_record_encode(record, &row) < 0 ||
- xlog_write_row(xlog, &row) < 0)
- return -1;
- return 0;
-}
-
-/**
- * Create an vy_log file from a recovery context.
- */
-static int
-vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
-{
- /*
- * Only create the log file if we have something
- * to write to it.
- */
- struct xlog xlog;
- xlog_clear(&xlog);
-
- say_verbose("saving vylog %lld", (long long)vclock_sum(vclock));
-
- struct vy_log_rotate_cb_arg arg = {
- .xlog = &xlog,
- .dir = &vy_log.dir,
- .vclock = vclock,
- };
- if (vy_recovery_iterate(recovery, vy_log_rotate_cb_func, &arg) < 0)
- goto err_write_xlog;
-
- if (!xlog_is_open(&xlog))
- goto done; /* nothing written */
-
- /* Mark the end of the snapshot. */
- struct xrow_header row;
- struct vy_log_record record;
- vy_log_record_init(&record);
- record.type = VY_LOG_SNAPSHOT;
- if (vy_log_record_encode(&record, &row) < 0 ||
- xlog_write_row(&xlog, &row) < 0)
- goto err_write_xlog;
-
- /* Finalize the new xlog. */
- if (xlog_flush(&xlog) < 0 ||
- xlog_sync(&xlog) < 0 ||
- xlog_rename(&xlog) < 0)
- goto err_write_xlog;
-
- xlog_close(&xlog, false);
-done:
- say_verbose("done saving vylog");
- return 0;
-
-err_write_xlog:
- /* Delete the unfinished xlog. */
- if (xlog_is_open(&xlog)) {
- if (unlink(xlog.filename) < 0)
- say_syserror("failed to delete file '%s'",
- xlog.filename);
- xlog_close(&xlog, false);
- }
- return -1;
-}
-
static ssize_t
vy_log_rotate_f(va_list ap)
{
@@ -1285,9 +1093,9 @@ vy_recovery_index_id_hash(uint32_t space_id, uint32_t index_id)
}
/** Lookup a vinyl index in vy_recovery::index_id_hash map. */
-static struct vy_index_recovery_info *
-vy_recovery_lookup_index_by_id(struct vy_recovery *recovery,
- uint32_t space_id, uint32_t index_id)
+struct vy_index_recovery_info *
+vy_recovery_lookup_index(struct vy_recovery *recovery,
+ uint32_t space_id, uint32_t index_id)
{
int64_t key = vy_recovery_index_id_hash(space_id, index_id);
struct mh_i64ptr_t *h = recovery->index_id_hash;
@@ -1412,6 +1220,7 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn,
free(index);
return -1;
}
+ rlist_add_entry(&recovery->indexes, index, in_recovery);
} else {
/*
* The index was dropped and recreated with the
@@ -1583,6 +1392,7 @@ vy_recovery_do_create_run(struct vy_recovery *recovery, int64_t run_id)
run->gc_lsn = -1;
run->is_incomplete = false;
run->is_dropped = false;
+ run->data = NULL;
rlist_create(&run->in_index);
if (recovery->max_id < run_id)
recovery->max_id = run_id;
@@ -2024,6 +1834,7 @@ vy_recovery_new_f(va_list ap)
goto fail;
}
+ rlist_create(&recovery->indexes);
recovery->index_id_hash = NULL;
recovery->index_lsn_hash = NULL;
recovery->range_hash = NULL;
@@ -2176,9 +1987,23 @@ vy_recovery_delete(struct vy_recovery *recovery)
free(recovery);
}
+/** Write a record to vylog. */
+static int
+vy_log_append_record(struct xlog *xlog, struct vy_log_record *record)
+{
+ say_verbose("save vylog record: %s", vy_log_record_str(record));
+
+ struct xrow_header row;
+ if (vy_log_record_encode(record, &row) < 0)
+ return -1;
+ if (xlog_write_row(xlog, &row) < 0)
+ return -1;
+ return 0;
+}
+
+/** Write all records corresponding to an index to vylog. */
static int
-vy_recovery_iterate_index(struct vy_index_recovery_info *index,
- vy_recovery_cb cb, void *cb_arg)
+vy_log_append_index(struct xlog *xlog, struct vy_index_recovery_info *index)
{
struct vy_range_recovery_info *range;
struct vy_slice_recovery_info *slice;
@@ -2192,7 +2017,7 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index,
record.space_id = index->space_id;
record.key_parts = index->key_parts;
record.key_part_count = index->key_part_count;
- if (cb(&record, cb_arg) != 0)
+ if (vy_log_append_record(xlog, &record) != 0)
return -1;
if (index->truncate_count > 0) {
@@ -2200,7 +2025,7 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index,
record.type = VY_LOG_TRUNCATE_INDEX;
record.index_lsn = index->index_lsn;
record.truncate_count = index->truncate_count;
- if (cb(&record, cb_arg) != 0)
+ if (vy_log_append_record(xlog, &record) != 0)
return -1;
}
@@ -2209,7 +2034,7 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index,
record.type = VY_LOG_DUMP_INDEX;
record.index_lsn = index->index_lsn;
record.dump_lsn = index->dump_lsn;
- if (cb(&record, cb_arg) != 0)
+ if (vy_log_append_record(xlog, &record) != 0)
return -1;
}
@@ -2223,8 +2048,7 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index,
}
record.index_lsn = index->index_lsn;
record.run_id = run->id;
- record.is_dropped = run->is_dropped;
- if (cb(&record, cb_arg) != 0)
+ if (vy_log_append_record(xlog, &record) != 0)
return -1;
if (!run->is_dropped)
@@ -2234,7 +2058,7 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index,
record.type = VY_LOG_DROP_RUN;
record.run_id = run->id;
record.gc_lsn = run->gc_lsn;
- if (cb(&record, cb_arg) != 0)
+ if (vy_log_append_record(xlog, &record) != 0)
return -1;
}
@@ -2245,7 +2069,7 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index,
record.range_id = range->id;
record.begin = range->begin;
record.end = range->end;
- if (cb(&record, cb_arg) != 0)
+ if (vy_log_append_record(xlog, &record) != 0)
return -1;
/*
* Newer slices are stored closer to the head of the list,
@@ -2260,7 +2084,7 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index,
record.run_id = slice->run->id;
record.begin = slice->begin;
record.end = slice->end;
- if (cb(&record, cb_arg) != 0)
+ if (vy_log_append_record(xlog, &record) != 0)
return -1;
}
}
@@ -2269,16 +2093,25 @@ vy_recovery_iterate_index(struct vy_index_recovery_info *index,
vy_log_record_init(&record);
record.type = VY_LOG_DROP_INDEX;
record.index_lsn = index->index_lsn;
- if (cb(&record, cb_arg) != 0)
+ if (vy_log_append_record(xlog, &record) != 0)
return -1;
}
return 0;
}
-int
-vy_recovery_iterate(struct vy_recovery *recovery,
- vy_recovery_cb cb, void *cb_arg)
+/** Create vylog from a recovery context. */
+static int
+vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
{
+ say_verbose("saving vylog %lld", (long long)vclock_sum(vclock));
+
+ /*
+ * Only create the log file if we have something
+ * to write to it.
+ */
+ struct xlog xlog;
+ xlog_clear(&xlog);
+
mh_int_t i;
mh_foreach(recovery->index_id_hash, i) {
struct vy_index_recovery_info *index;
@@ -2290,59 +2123,44 @@ vy_recovery_iterate(struct vy_recovery *recovery,
*/
if (index->is_dropped && rlist_empty(&index->runs))
continue;
- if (vy_recovery_iterate_index(index, cb, cb_arg) < 0)
- return -1;
+
+ /* Create the log file on the first write. */
+ if (!xlog_is_open(&xlog) &&
+ xdir_create_xlog(&vy_log.dir, &xlog, vclock) != 0)
+ goto err_create_xlog;
+
+ if (vy_log_append_index(&xlog, index) != 0)
+ goto err_write_xlog;
}
+ if (!xlog_is_open(&xlog))
+ goto done; /* nothing written */
+
+ /* Mark the end of the snapshot. */
+ struct vy_log_record record;
+ vy_log_record_init(&record);
+ record.type = VY_LOG_SNAPSHOT;
+ if (vy_log_append_record(&xlog, &record) != 0)
+ goto err_write_xlog;
+
+ /* Finalize the new xlog. */
+ if (xlog_flush(&xlog) < 0 ||
+ xlog_sync(&xlog) < 0 ||
+ xlog_rename(&xlog) < 0)
+ goto err_write_xlog;
+
+ xlog_close(&xlog, false);
+done:
+ say_verbose("done saving vylog");
return 0;
-}
-int
-vy_recovery_load_index(struct vy_recovery *recovery,
- uint32_t space_id, uint32_t index_id,
- int64_t index_lsn, bool is_checkpoint_recovery,
- vy_recovery_cb cb, void *cb_arg)
-{
- struct vy_index_recovery_info *index;
- index = vy_recovery_lookup_index_by_id(recovery, space_id, index_id);
- if (index == NULL)
- return 0;
- /* See the comment to the function declaration. */
- if (index_lsn < index->index_lsn) {
- /*
- * Loading a past incarnation of the index.
- * Emit create/drop records to indicate that
- * it is going to be dropped by a WAL statement
- * and hence doesn't need to be recovered.
- */
- struct vy_log_record record;
- vy_log_record_init(&record);
- record.type = VY_LOG_CREATE_INDEX;
- record.index_id = index->index_id;
- record.space_id = index->space_id;
- record.index_lsn = index_lsn;
- if (cb(&record, cb_arg) != 0)
- return -1;
- vy_log_record_init(&record);
- record.type = VY_LOG_DROP_INDEX;
- record.index_lsn = index_lsn;
- if (cb(&record, cb_arg) != 0)
- return -1;
- return 0;
- } else if (is_checkpoint_recovery || index_lsn == index->index_lsn) {
- /*
- * Loading the last incarnation of the index.
- * Replay all records we have recovered from
- * the log for this index.
- */
- return vy_recovery_iterate_index(index, cb, cb_arg);
- } else {
- /*
- * The requested incarnation is missing in the recovery
- * context, because we failed to log it before restart.
- * Do nothing and let the caller retry logging.
- */
- assert(!is_checkpoint_recovery);
- assert(index_lsn > index->index_lsn);
- return 0;
- }
+err_write_xlog:
+ /* Delete the unfinished xlog. */
+ assert(xlog_is_open(&xlog));
+ if (unlink(xlog.filename) < 0)
+ say_syserror("failed to delete file '%s'",
+ xlog.filename);
+ xlog_close(&xlog, false);
+
+err_create_xlog:
+ return -1;
}
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index f17b122a..8fbacd0f 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -34,6 +34,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
+#include <small/rlist.h>
#include "salad/stailq.h"
@@ -58,8 +59,7 @@ struct xlog;
struct vclock;
struct key_def;
struct key_part_def;
-
-struct vy_recovery;
+struct mh_i64ptr_t;
/** Type of a metadata log record. */
enum vy_log_record_type {
@@ -175,12 +175,6 @@ struct vy_log_record {
/** Unique ID of the run slice. */
int64_t slice_id;
/**
- * For VY_LOG_CREATE_RUN record: hint that the run
- * is dropped, i.e. there is a VY_LOG_DROP_RUN record
- * following this one.
- */
- bool is_dropped;
- /**
* Msgpack key for start of the range/slice.
* NULL if the range/slice starts from -inf.
*/
@@ -213,6 +207,127 @@ struct vy_log_record {
struct stailq_entry in_tx;
};
+/** Recovery context. */
+struct vy_recovery {
+ /**
+ * List of all indexes stored in the recovery context,
+ * linked by vy_index_recovery_info::in_recovery.
+ */
+ struct rlist indexes;
+ /** space_id, index_id -> vy_index_recovery_info. */
+ struct mh_i64ptr_t *index_id_hash;
+ /** index_lsn -> vy_index_recovery_info. */
+ struct mh_i64ptr_t *index_lsn_hash;
+ /** ID -> vy_range_recovery_info. */
+ struct mh_i64ptr_t *range_hash;
+ /** ID -> vy_run_recovery_info. */
+ struct mh_i64ptr_t *run_hash;
+ /** ID -> vy_slice_recovery_info. */
+ struct mh_i64ptr_t *slice_hash;
+ /**
+ * Maximal vinyl object ID, according to the metadata log,
+ * or -1 in case no vinyl objects were recovered.
+ */
+ int64_t max_id;
+};
+
+/** Vinyl index info stored in a recovery context. */
+struct vy_index_recovery_info {
+ /** Link in vy_recovery::indexes. */
+ struct rlist in_recovery;
+ /** LSN of the index creation. */
+ int64_t index_lsn;
+ /** Ordinal index number in the space. */
+ uint32_t index_id;
+ /** Space ID. */
+ uint32_t space_id;
+ /** Array of key part definitions. */
+ struct key_part_def *key_parts;
+ /** Number of key parts. */
+ uint32_t key_part_count;
+ /** True if the index was dropped. */
+ bool is_dropped;
+ /** LSN of the last index dump. */
+ int64_t dump_lsn;
+ /** Truncate count. */
+ int64_t truncate_count;
+ /**
+ * List of all ranges in the index, linked by
+ * vy_range_recovery_info::in_index.
+ */
+ struct rlist ranges;
+ /**
+ * List of all runs created for the index
+ * (both committed and not), linked by
+ * vy_run_recovery_info::in_index.
+ */
+ struct rlist runs;
+};
+
+/** Vinyl range info stored in a recovery context. */
+struct vy_range_recovery_info {
+ /** Link in vy_index_recovery_info::ranges. */
+ struct rlist in_index;
+ /** ID of the range. */
+ int64_t id;
+ /** Start of the range, stored in MsgPack array. */
+ char *begin;
+ /** End of the range, stored in MsgPack array. */
+ char *end;
+ /**
+ * List of all slices in the range, linked by
+ * vy_slice_recovery_info::in_range.
+ *
+ * Newer slices are closer to the head.
+ */
+ struct rlist slices;
+};
+
+/** Run info stored in a recovery context. */
+struct vy_run_recovery_info {
+ /** Link in vy_index_recovery_info::runs. */
+ struct rlist in_index;
+ /** ID of the run. */
+ int64_t id;
+ /** Max LSN stored on disk. */
+ int64_t dump_lsn;
+ /**
+ * For deleted runs: LSN of the last checkpoint
+ * that uses this run.
+ */
+ int64_t gc_lsn;
+ /**
+ * True if the run was not committed (there's
+ * VY_LOG_PREPARE_RUN, but no VY_LOG_CREATE_RUN).
+ */
+ bool is_incomplete;
+ /** True if the run was dropped (VY_LOG_DROP_RUN). */
+ bool is_dropped;
+ /*
+ * The following field is initialized to NULL and
+ * ignored by vy_log subsystem. It may be used by
+ * the caller to store some extra information.
+ *
+ * During recovery, we store a pointer to vy_run
+ * corresponding to this object.
+ */
+ void *data;
+};
+
+/** Slice info stored in a recovery context. */
+struct vy_slice_recovery_info {
+ /** Link in vy_range_recovery_info::slices. */
+ struct rlist in_range;
+ /** ID of the slice. */
+ int64_t id;
+ /** Run this slice was created for. */
+ struct vy_run_recovery_info *run;
+ /** Start of the slice, stored in MsgPack array. */
+ char *begin;
+ /** End of the slice, stored in MsgPack array. */
+ char *end;
+};
+
/**
* Initialize the metadata log.
* @dir is the directory where log files are stored.
@@ -359,70 +474,14 @@ vy_recovery_new(int64_t signature, bool only_checkpoint);
void
vy_recovery_delete(struct vy_recovery *recovery);
-typedef int
-(*vy_recovery_cb)(const struct vy_log_record *record, void *arg);
-
-/**
- * Iterate over all objects stored in a recovery context.
- *
- * This function invokes callback @cb for each object (index, run, etc)
- * stored in the given recovery context. The callback is passed a record
- * used to log the object and optional argument @cb_arg. If the callback
- * returns a value different from 0, iteration stops and -1 is returned,
- * otherwise the function returns 0.
- *
- * To ease the work done by the callback, records corresponding to
- * slices of a range always go right after the range, in the
- * chronological order, while an index's runs go after the index
- * and before its ranges.
- */
-int
-vy_recovery_iterate(struct vy_recovery *recovery,
- vy_recovery_cb cb, void *cb_arg);
-
/**
- * Load an index from a recovery context.
- *
- * Call @cb for each object related to the index. Break the loop and
- * return -1 if @cb returned a non-zero value, otherwise return 0.
- * Objects are loaded in the same order as by vy_recovery_iterate().
+ * Look up the last incarnation of an index stored in a recovery context.
*
- * Note, this function returns 0 if there's no index with the requested
- * id in the recovery context. In this case, @cb isn't called at all.
- *
- * The @is_checkpoint_recovery flag indicates that the row that created
- * the index was loaded from a snapshot, in which case @index_lsn is
- * the snapshot signature. Otherwise @index_lsn is the LSN of the WAL
- * row that created the index.
- *
- * The index is looked up by @space_id and @index_id while @index_lsn
- * is used to discern different incarnations of the same index as
- * follows. Let @record denote the vylog record corresponding to the
- * last incarnation of the index. Then
- *
- * - If @is_checkpoint_recovery is set and @index_lsn >= @record->index_lsn,
- * the last index incarnation was created before the snapshot and we
- * need to load it right now.
- *
- * - If @is_checkpoint_recovery is set and @index_lsn < @record->index_lsn,
- * the last index incarnation was created after the snapshot, i.e.
- * the index loaded now is going to be dropped so load a dummy.
- *
- * - If @is_checkpoint_recovery is unset and @index_lsn < @record->index_lsn,
- * the last index incarnation is created further in WAL, load a dummy.
- *
- * - If @is_checkpoint_recovery is unset and @index_lsn == @record->index_lsn,
- * load the last index incarnation.
- *
- * - If @is_checkpoint_recovery is unset and @index_lsn > @record->index_lsn,
- * it seems we failed to log index creation before restart. In this
- * case don't do anything. The caller is supposed to retry logging.
+ * Returns NULL if the index was not found.
*/
-int
-vy_recovery_load_index(struct vy_recovery *recovery,
- uint32_t space_id, uint32_t index_id,
- int64_t index_lsn, bool is_checkpoint_recovery,
- vy_recovery_cb cb, void *cb_arg);
+struct vy_index_recovery_info *
+vy_recovery_lookup_index(struct vy_recovery *recovery,
+ uint32_t space_id, uint32_t index_id);
/**
* Initialize a log record with default values.
diff --git a/test/unit/vy_log_stub.c b/test/unit/vy_log_stub.c
index daabf3f9..1fda0a6b 100644
--- a/test/unit/vy_log_stub.c
+++ b/test/unit/vy_log_stub.c
@@ -51,11 +51,9 @@ vy_log_tx_commit(void)
void
vy_log_write(const struct vy_log_record *record) {}
-int
-vy_recovery_load_index(struct vy_recovery *recovery,
- uint32_t space_id, uint32_t index_id,
- int64_t index_lsn, bool snapshot_recovery,
- vy_recovery_cb cb, void *cb_arg)
+struct vy_index_recovery_info *
+vy_recovery_lookup_index(struct vy_recovery *recovery,
+ uint32_t space_id, uint32_t index_id)
{
unreachable();
}
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 5c78babf..603d2865 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -189,12 +189,12 @@ result
timestamp: <timestamp>
type: INSERT
BODY:
- tuple: [7, {2: 3}]
+ tuple: [7, {2: 2}]
- HEADER:
timestamp: <timestamp>
type: INSERT
BODY:
- tuple: [7, {2: 2}]
+ tuple: [7, {2: 3}]
- HEADER:
timestamp: <timestamp>
type: INSERT
--
2.11.0
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2018-03-16 12:05 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-03-16 12:05 [PATCH] vinyl: refactor vylog recovery Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox