From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp48.i.mail.ru (smtp48.i.mail.ru [94.100.177.108]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 0884C45C304 for ; Tue, 1 Dec 2020 15:32:53 +0300 (MSK) References: <20201116164006.fw3jwes4dwbx7nsd@tkn_work_nb> <8365255bb9eef01293f66d9a7293730fecc49e2b.1605691680.git.sergeyb@tarantool.org> <20201127014528.v4ivfv7akgdxak5j@tkn_work_nb> From: Sergey Bronnikov Message-ID: <68835173-98b8-60ce-4ea5-ae427c530db3@tarantool.org> Date: Tue, 1 Dec 2020 15:32:52 +0300 MIME-Version: 1.0 In-Reply-To: <20201127014528.v4ivfv7akgdxak5j@tkn_work_nb> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-US Subject: Re: [Tarantool-patches] [PATCH] Add options for upgrade testing List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Alexander Turenko Cc: tarantool-patches@dev.tarantool.org Hi, On 27.11.2020 04:45, Alexander Turenko wrote: > Sorry for the long delay. > > I looked over the patch, tested it a bit and proposed several fixups. > > I pasted them here (for ease commenting) and pushed to the branch > (upward your commits). If you're okay with them, I can squash commits > and push the result. Or, maybe, extract several commits (re debug > property, re xlog module) and squash the rest (keeping the conflicting > options fix separate, of course). > > The branch: ligurio/gh-4801-add-snapshot-option. > > And sorry that I reworked a lot of code. I tried to minimize review > ping-ponging, but make myself comfortable with the result. And amount of > changes becomes larger than I initially expect. A lot of low hanging > fruits of the kind 'okay, but there is the easy way to make it a bit > better' appears. Many thanks for patches! Especially those that simplifies a code. > I hope such moments will be less and less frequent with a time we'll > work in touch. A review is often about compromises and it is the work on > ourself for both sides. > > WBR, Alexander Turenko. > > new file mode 100644 > index 0000000..dbaa4dc > --- /dev/null > +++ b/lib/xlog.py > @@ -0,0 +1,232 @@ > +"""Xlog and snapshot utility functions.""" > + > +import os > +import msgpack > +import subprocess > +import json > +from uuid import uuid4 > + > + > +__all__ = ['init', 'snapshot_is_for_bootstrap', 'prepare_bootstrap_snapshot'] > + > + > +# {{{ Constants > + > +# Xlog binary format constants. > +ROW_MARKER = b'\xd5\xba\x0b\xab' > +EOF_MARKER = b'\xd5\x10\xad\xed' > +XLOG_FIXHEADER_SIZE = 19 > +VCLOCK_MAX = 32 > +VCLOCK_STR_LEN_MAX = 1 + VCLOCK_MAX * (2 + 2 + 20 + 2) + 1 > +XLOG_META_LEN_MAX = 1024 + VCLOCK_STR_LEN_MAX > + > + > +# The binary protocol (iproto) constants. > +# > +# Widely reused for xlog / snapshot files. > +IPROTO_REQUEST_TYPE = 0x00 > +IPROTO_LSN = 0x03 > +IPROTO_TIMESTAMP = 0x04 > +IPROTO_INSERT = 2 > +IPROTO_SPACE_ID = 0x10 > +IPROTO_TUPLE = 0x21 > + > + > +# System space IDs. > +BOX_SCHEMA_ID = 272 > +BOX_CLUSTER_ID = 320 > + > +# }}} Constants > + > + > +tarantool_cmd = 'tarantool' > +tarantoolctl_cmd = 'tarantoolctl' > +debug_f = lambda x: None # noqa: E731 > + > + > +def init(tarantool=None, tarantoolctl=None, debug=None): > + """ Redefine module level globals. > + > + If the function is not called, tarantool and tarantoolctl > + will be called according to the PATH environment variable. > + > + Beware: tarantool and tarantoolctl are lists. Example: > + > + tarantool_cmd = ['/path/to/tarantool'] > + tarantoolctl_cmd = tarantool_cmd + ['/path/to/tarantoolctl'] > + xlog.init(tarantool=tarantool_cmd, tarantoolctl=tarantoolctl_cmd) > + """ > + global tarantool_cmd > + global tarantoolctl_cmd > + global debug_f > + > + if tarantool: > + assert isinstance(tarantool, list) > + tarantool_cmd = tarantool > + if tarantool_cmd: > + assert isinstance(tarantoolctl, list) > + tarantoolctl_cmd = tarantoolctl > + if debug: > + debug_f = debug > + > + > +# {{{ General purpose helpers > + > +def crc32c(data): > + """ Use tarantool's implementation of CRC32C algorithm. > + > + Python has no built-in implementation of CRC32C. > + """ > + lua = "print(require('digest').crc32_update(0, io.stdin:read({})))".format( > + len(data)) > + with open(os.devnull, 'w') as devnull: > + process = subprocess.Popen(tarantool_cmd + ['-e', lua], > + stdin=subprocess.PIPE, > + stdout=subprocess.PIPE, > + stderr=devnull) > + process.stdin.write(data) > + res, _ = process.communicate() > + return int(res) > + > +# }}} General purpose helpers > + > + > +# {{{ parse xlog / snapshot > + > +def xlog_rows(xlog_path): > + cmd = tarantoolctl_cmd + ['cat', xlog_path, '--format=json', > + '--show-system'] > + with open(os.devnull, 'w') as devnull: > + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=devnull) > + for line in process.stdout.readlines(): > + yield json.loads(line) > + > +# }}} parse xlog / snapshot > + > + > +# {{{ xlog encode data helpers > + > +def encode_xrow_header(xrow): > + packer = msgpack.Packer(use_bin_type=False) > + xrow_header = ROW_MARKER > + # xrow size > + xrow_header += packer.pack(len(xrow)) > + # previous xrow crc32 > + xrow_header += packer.pack(0) > + # current xrow crc32 > + xrow_header += packer.pack(crc32c(xrow)) > + # padding > + padding_size = XLOG_FIXHEADER_SIZE - len(xrow_header) > + xrow_header += packer.pack(b'\x00' * (padding_size - 1)) > + assert(len(xrow_header) == XLOG_FIXHEADER_SIZE) > + return xrow_header > + > + > +def encode_xrow(header, body): > + packer = msgpack.Packer(use_bin_type=False) > + header = packer.pack(header) > + body = packer.pack(body) > + xrow_data = header + body > + return encode_xrow_header(xrow_data) + xrow_data > + > +# }}} xlog encode data helpers > + > + > +# {{{ xlog write data helpers > + > +def xlog_seek_end(xlog): > + """Set the file position right before the end marker.""" > + WHENCE_END = 2 > + xlog.seek(-4, WHENCE_END) > + eof_marker = xlog.read(4) > + if eof_marker != EOF_MARKER: > + raise RuntimeError('Invalid eof marker: {}'.format(eof_marker)) > + xlog.seek(-4, WHENCE_END) > + > + > +def xlog_write_eof(xlog): > + xlog.write(EOF_MARKER) > + > +# }}} xlog write data helpers > + > + > +# {{{ xlog write meta helpers > + > +def xlog_meta_write_instance_uuid(xlog, instance_uuid): > + xlog.seek(0) > + xlog.seek(xlog.read(XLOG_META_LEN_MAX).find(b'Instance: ')) > + # Trick: old and new UUID have the same size. > + xlog.write(b'Instance: ' + instance_uuid) > + > +# }}} xlog write meta helpers > + > + > +def snapshot_is_for_bootstrap(snapshot_path): > + """ A bootstrap snapshot (src/box/bootstrap.snap) has no > + and in _schema and > + _cluster system spaces. > + """ > + cluster_uuid_exists = False > + instance_uuid_exists = False > + > + for row in xlog_rows(snapshot_path): > + if row['HEADER']['type'] == 'INSERT' and \ > + row['BODY']['space_id'] == BOX_SCHEMA_ID and \ > + row['BODY']['tuple'][0] == 'cluster': > + cluster_uuid_exists = True > + > + if row['HEADER']['type'] == 'INSERT' and \ > + row['BODY']['space_id'] == BOX_CLUSTER_ID and \ > + row['BODY']['tuple'][0] == 1: > + instance_uuid_exists = True > + > + if cluster_uuid_exists and instance_uuid_exists: > + break > + > + if cluster_uuid_exists != instance_uuid_exists: > + raise RuntimeError('A cluster UUID and an instance UUID should exist ' > + 'or not exist both') > + > + return not cluster_uuid_exists > + snapshot_is_for_bootstrap() accept a path that can be None (see default value in argument --bootstrap) when option --bootstrap is not specified. Due to this xlog_rows() may fail. xlog_rows() returns iterator and we can return something like "yield None" but we should handle this properly everywhere where xlog_rows() called. So I just added patch to snapshot_is_for_bootstrap(): --- a/lib/options.py +++ b/lib/options.py @@ -257,7 +257,7 @@ class Options:              exit(-1)      def check_bootstrap_option(self): -        if not snapshot_is_for_bootstrap(self.args.snapshot_path): +        if self.args.snapshot_path and not snapshot_is_for_bootstrap(self.args.snapshot_path):              color_stdout('Expected a boostrap snapshot, one for local recovery '                           'is given\n', schema='error')              exit(1) > + > +def prepare_bootstrap_snapshot(snapshot_path): > + """ Prepare a bootstrap snapshot to use with local recovery.""" > + cluster_uuid = str(uuid4()).encode('ascii') > + debug_f('Cluster UUID: {}'.format(cluster_uuid)) > + instance_uuid = str(uuid4()).encode('ascii') > + instance_id = 1 > + debug_f('Instance ID: {}'.format(instance_id)) > + debug_f('Instance UUID: {}'.format(instance_uuid)) > + > + last_row = list(xlog_rows(snapshot_path))[-1] > + lsn = int(last_row['HEADER']['lsn']) > + timestamp = float(last_row['HEADER']['timestamp']) > + > + with open(snapshot_path, 'rb+') as xlog: > + xlog_meta_write_instance_uuid(xlog, instance_uuid) > + xlog_seek_end(xlog) > + > + # Write cluster UUID to _schema. > + lsn += 1 > + xlog.write(encode_xrow({ > + IPROTO_REQUEST_TYPE: IPROTO_INSERT, > + IPROTO_LSN: lsn, > + IPROTO_TIMESTAMP: timestamp, > + }, { > + IPROTO_SPACE_ID: BOX_SCHEMA_ID, > + IPROTO_TUPLE: ['cluster', cluster_uuid], > + })) > + > + # Write replica ID and replica UUID to _cluster. > + lsn += 1 > + xlog.write(encode_xrow({ > + IPROTO_REQUEST_TYPE: IPROTO_INSERT, > + IPROTO_LSN: lsn, > + IPROTO_TIMESTAMP: timestamp, > + }, { > + IPROTO_SPACE_ID: BOX_CLUSTER_ID, > + IPROTO_TUPLE: [1, instance_uuid], > + })) > + > + xlog_write_eof(xlog) >