Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH] core: Non-blocking io.popen
@ 2019-04-24 17:00 Stanislav Zudin
  2019-05-07  8:49 ` Vladimir Davydov
  0 siblings, 1 reply; 3+ messages in thread
From: Stanislav Zudin @ 2019-04-24 17:00 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Stanislav Zudin

Adds nonblocking implementation of popen.
The method is available in namespace fio.
fio.popen() returns an object with three methods:
read(), write() and close().
Method read() reads both STDOUT and STDERR output.

Closes #4031
---
Branch: https://github.com/tarantool/tarantool/tree/stanztt/gh-4031-nonblocking-popen
Issue: https://github.com/tarantool/tarantool/issues/4031

 src/lib/core/CMakeLists.txt     |   1 +
 src/lib/core/coio_file.c        | 189 +++++++++++++++++
 src/lib/core/coio_file.h        |  13 ++
 src/lib/core/coio_popen.c       | 366 ++++++++++++++++++++++++++++++++
 src/lib/core/coio_popen.h       | 123 +++++++++++
 src/lib/core/fiber.h            |   4 +-
 src/lua/fio.c                   |  86 ++++++++
 src/lua/fio.lua                 |  80 +++++++
 test/box-tap/fio_popen.test.lua |  79 +++++++
 test/box-tap/fio_popen_test1.sh |   7 +
 third_party/libeio/eio.c        |  19 +-
 third_party/libeio/etp.c        |  28 +++
 third_party/libev/ev.c          |   2 +
 13 files changed, 994 insertions(+), 3 deletions(-)
 create mode 100644 src/lib/core/coio_popen.c
 create mode 100644 src/lib/core/coio_popen.h
 create mode 100755 test/box-tap/fio_popen.test.lua
 create mode 100755 test/box-tap/fio_popen_test1.sh

diff --git a/src/lib/core/CMakeLists.txt b/src/lib/core/CMakeLists.txt
index eb10b11c3..8b1f8d32e 100644
--- a/src/lib/core/CMakeLists.txt
+++ b/src/lib/core/CMakeLists.txt
@@ -26,6 +26,7 @@ set(core_sources
     trigger.cc
     mpstream.c
     port.c
+    coio_popen.c
 )
 
 if (TARGET_OS_NETBSD)
diff --git a/src/lib/core/coio_file.c b/src/lib/core/coio_file.c
index c5b2db781..ffe9d634c 100644
--- a/src/lib/core/coio_file.c
+++ b/src/lib/core/coio_file.c
@@ -33,6 +33,7 @@
 #include "fiber.h"
 #include "say.h"
 #include "fio.h"
+#include "coio_popen.h"
 #include <stdio.h>
 #include <stdlib.h>
 #include <dirent.h>
@@ -102,6 +103,31 @@ struct coio_file_task {
 			const char *source;
 			const char *dest;
 		} copyfile;
+
+		struct {
+			const char *command;
+			const char *type;
+			void *handle;
+		} popen_params;
+
+		struct {
+			void *handle;
+		} pclose_params;
+
+		struct {
+			void *handle;
+			const void* buf;
+			size_t count;
+			size_t *written;
+		} popen_write;
+
+		struct {
+			void *handle;
+			void *buf;
+			size_t count;
+			size_t *read_bytes;
+			int *output_number;
+		} popen_read;
 	};
 };
 
@@ -628,3 +654,166 @@ coio_copyfile(const char *source, const char *dest)
 	eio_req *req = eio_custom(coio_do_copyfile, 0, coio_complete, &eio);
 	return coio_wait_done(req, &eio);
 }
+
+static void
+coio_do_popen(eio_req *req)
+{
+	struct coio_file_task *eio = (struct coio_file_task *)req->data;
+	eio->popen_params.handle = coio_popen_impl(eio->popen_params.command,
+						   eio->popen_params.type);
+	eio->result = 0;
+	eio->errorno = errno;
+}
+
+void *
+coio_popen(const char *command, const char *type)
+{
+	INIT_COEIO_FILE(eio);
+	eio.popen_params.command = command;
+	eio.popen_params.type = type;
+
+	eio_req *req = eio_custom(coio_do_popen, 0,
+				  coio_complete, &eio);
+	coio_wait_done(req, &eio);
+	return eio.popen_params.handle;
+}
+
+static void
+coio_do_pclose(eio_req *req)
+{
+	struct coio_file_task *eio = (struct coio_file_task *)req->data;
+	req->result = coio_try_pclose_impl(eio->pclose_params.handle);
+	req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_pclose(void *fh)
+{
+	INIT_COEIO_FILE(eio);
+	eio.pclose_params.handle = fh;
+	eio_req *req = eio_custom(coio_do_pclose, 0,
+				  coio_complete, &eio);
+	return coio_wait_done(req, &eio);
+}
+
+int
+coio_pclose(void *fh)
+{
+	int rc = coio_try_pclose_impl(fh);
+	if (rc == 0)		/* The child process is dead */
+		return 0;
+	else if (rc == -1)	/* Failed */
+		return -1;
+
+	assert(rc == -2); 	/* A blocking operation is expected */
+
+	do {
+		rc = coio_do_nonblock_pclose(fh);
+	} while (rc == -2);
+
+	return rc;
+}
+
+static void
+coio_do_popen_read(eio_req *req)
+{
+	struct coio_file_task *eio = (struct coio_file_task *)req->data;
+
+	int rc = coio_popen_try_to_read(eio->popen_read.handle,
+					eio->popen_read.buf,
+					eio->popen_read.count,
+					eio->popen_read.read_bytes,
+					eio->popen_read.output_number);
+
+	req->result = rc;
+	req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_popen_read(void *fh, void *buf, size_t count,
+	size_t *read_bytes, int *source_id)
+{
+	INIT_COEIO_FILE(eio);
+	eio.popen_read.buf = buf;
+	eio.popen_read.count = count;
+	eio.popen_read.handle = fh;
+	eio.popen_read.read_bytes = read_bytes;
+	eio.popen_read.output_number = source_id;
+	eio_req *req = eio_custom(coio_do_popen_read, 0,
+				  coio_complete, &eio);
+	return coio_wait_done(req, &eio);
+}
+
+ssize_t
+coio_popen_read(void *fh, void *buf, size_t count, int *output_number)
+{
+	size_t received = 0;
+	int rc = coio_popen_try_to_read(fh, buf, count,
+		&received, output_number);
+	if (rc == 0)		/* The reading's succeeded */
+		return (ssize_t)received;
+	else if (rc == -1)	/* Failed */
+		return -1;
+
+	assert(rc == -2); 	/* A blocking operation is expected */
+
+	do {
+		rc = coio_do_nonblock_popen_read(fh, buf, count,
+			&received, output_number);
+	} while (rc == -2);
+
+	return (rc == 0) ? (ssize_t)received
+			 : -1;
+}
+
+static void
+coio_do_popen_write(eio_req *req)
+{
+	struct coio_file_task *eio = (struct coio_file_task *)req->data;
+
+	int rc = coio_popen_try_to_write(eio->popen_write.handle,
+					eio->popen_write.buf,
+					eio->popen_write.count,
+					eio->popen_write.written);
+
+	req->result = rc;
+	req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_popen_write(void *fh, const void *buf, size_t count,
+	size_t *written)
+{
+	INIT_COEIO_FILE(eio);
+	eio.popen_write.buf = buf;
+	eio.popen_write.count = count;
+	eio.popen_write.handle = fh;
+	eio.popen_write.written = written;
+	eio_req *req = eio_custom(coio_do_popen_write, 0,
+				  coio_complete, &eio);
+	return coio_wait_done(req, &eio);
+}
+
+ssize_t
+coio_popen_write(void *fh, const void *buf, size_t count)
+{
+	size_t written = 0;
+	int rc = coio_popen_try_to_write(fh, buf, count,
+					&written);
+	if (rc == 0)		/* The writing's succeeded */
+		return (ssize_t)written;
+	else if (rc == -1)	/* Failed */
+		return -1;
+
+	assert(rc == -2); 	/* A blocking operation is expected */
+
+	do {
+		buf += written;		/* advance writing position */
+		count -= written;
+		rc = coio_do_nonblock_popen_write(fh, buf, count,
+						  &written);
+	} while (rc == -2);
+
+	return (rc == 0) ? (ssize_t)written
+			 : -1;
+}
diff --git a/src/lib/core/coio_file.h b/src/lib/core/coio_file.h
index f2112ceed..4089c1a05 100644
--- a/src/lib/core/coio_file.h
+++ b/src/lib/core/coio_file.h
@@ -84,6 +84,19 @@ int	coio_tempdir(char *path, size_t path_len);
 
 int	coio_readdir(const char *path, char **buf);
 int	coio_copyfile(const char *source, const char *dest);
+
+void *
+coio_popen(const char *command, const char *type);
+
+int
+coio_pclose(void *fd);
+
+ssize_t
+coio_popen_read(void *fh, void *buf, size_t count, int *output_number);
+
+ssize_t
+coio_popen_write(void *fh, const void *buf, size_t count);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/lib/core/coio_popen.c b/src/lib/core/coio_popen.c
new file mode 100644
index 000000000..57b51b6c5
--- /dev/null
+++ b/src/lib/core/coio_popen.c
@@ -0,0 +1,366 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stddef.h>
+#include "coio_popen.h"
+#include "coio_task.h"
+#include "fiber.h"
+#include "say.h"
+#include "fio.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <dirent.h>
+#include <wait.h>
+#include <paths.h>
+#include <unistd.h>
+#include <sys/types.h> /* See NOTES */
+#include <sys/socket.h>
+
+struct popen_data {
+	/* process id */
+	pid_t pid;
+	int fh[3];
+	/*
+	 * Three handles:
+	 * [0] write to stdin of the child process
+	 * [1] read from stdout of the child process
+	 * [2] read from stderr of the child process
+	 */
+
+	/* The ID of socket was read recently
+	 * (STDERR_FILENO or STDOUT_FILENO */
+	int prev_source;
+};
+
+/*
+ * Returns next socket to read.
+ * Use this function when both STDOUT and STDERR outputs
+ * are ready for reading.
+ * */
+static inline int
+get_handle_in_order(struct popen_data *data)
+{
+	/*
+	 * Invert the order of handles to be read
+	 */
+	const int mask = STDERR_FILENO | STDOUT_FILENO;
+	data->prev_source ^= mask;
+
+	/*
+	 * If handle is not available, invert it back
+	 */
+	if (data->fh[data->prev_source] < 0)
+		data->prev_source ^= mask;
+	/*
+	 * if both reading handles are invalid return -1
+	 */
+	return data->fh[data->prev_source];
+}
+
+static struct popen_data *
+popen_data_new()
+{
+	struct popen_data *data =
+		(struct popen_data *)calloc(1, sizeof(*data));
+	data->fh[0] = -1;
+	data->fh[1] = -1;
+	data->fh[2] = -1;
+	data->prev_source = STDERR_FILENO;
+	/*
+	 * if both streams are ready then
+	 * start reading from STDOUT
+	 */
+	return data;
+}
+
+void *
+coio_popen_impl(const char *command, const char *type)
+{
+	pid_t pid;
+	int socket_rw[2] = {-1,-1};
+	int socket_err[2] = {-1,-1};
+	errno = 0;
+
+	char *argv[] = {"sh", "-c", NULL, NULL};
+	argv[2] = (char *)command;
+
+	if ((*type != 'r' && *type != 'w') || type[1] != '\0') {
+		errno = EINVAL;
+		return NULL;
+	}
+
+	struct popen_data *data = popen_data_new();
+	if (data == NULL)
+		return NULL;
+
+	const int sock_type = SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC;
+	if (socketpair(AF_UNIX, sock_type, 0, socket_rw) < 0) {
+		goto on_error;
+	}
+	if (socketpair(AF_UNIX, sock_type, 0, socket_err) < 0) {
+		goto on_error;
+	}
+
+	pid = fork();
+
+	if (pid < 0)
+		goto on_error;
+	else if (pid == 0) /* child */ {
+		/* Setup stdin/stdout */
+		close(socket_rw[0]);
+		int fno = (*type == 'r') ? STDOUT_FILENO
+					 : STDIN_FILENO;
+		if (socket_rw[1] != fno) {
+			dup2(socket_rw[1], fno);
+			close(socket_rw[1]);
+		}
+
+		/* setup stderr */
+		close(socket_err[0]);
+		if (socket_err[1] != STDERR_FILENO) {
+			dup2(socket_err[1], STDERR_FILENO);
+			close(socket_err[1]);
+		}
+
+		execve(_PATH_BSHELL, argv, environ);
+		_exit(127);
+		unreachable();
+	}
+
+	/* parent process */
+	close(socket_rw[1]);
+	close(socket_err[1]);
+
+	if (*type == 'r')
+		data->fh[STDOUT_FILENO] = socket_rw[0];
+	else
+		data->fh[STDIN_FILENO] = socket_rw[0];
+
+	data->fh[STDERR_FILENO] = socket_err[0];
+	data->pid = pid;
+
+	return data;
+
+on_error:
+	if (data)
+		free(data);
+	if (socket_rw[0] >= 0) {
+		close(socket_rw[0]);
+		close(socket_rw[1]);
+	}
+	if (socket_err[0] >= 0) {
+		close(socket_err[0]);
+		close(socket_err[1]);
+	}
+	return NULL;
+}
+
+int
+coio_try_pclose_impl(void *fh)
+{
+	struct popen_data *data = (struct popen_data *)fh;
+
+	if (data == NULL){
+		errno = EBADF;
+		return -1;
+	}
+
+	/* Close all handles */
+	for(int i = 0; i < 3; ++i) {
+		if (data->fh[i] >= 0) {
+			close(data->fh[i]);
+			data->fh[i] = -1;
+		}
+	}
+
+	int pstat;
+	pid_t pid = waitpid(data->pid, &pstat, WNOHANG);
+
+	int rc = 0;
+
+	if (pid == 0)
+		return -2;		/* Process is still running */
+	else if (pid < 0) {
+		if (errno == ECHILD)
+			rc = 0;		/* Child process is not found
+ 					 * (may be is already dead)
+ 					 */
+		else if (errno == EINTR)
+			return -2;	/* Retry */
+		else
+			rc = -1;	/* An error occurred */
+	}
+
+	free(data);
+	return rc;
+}
+
+int
+coio_popen_try_to_read(void *fh, void *buf, size_t count,
+	size_t *read_bytes, int *source_id)
+{
+	struct popen_data *data = (struct popen_data *)fh;
+
+	if (data == NULL){
+		errno = EBADF;
+		return -1;
+	}
+
+	fd_set rfds;
+	FD_ZERO(&rfds);
+	ssize_t received = 0;
+	int num = 0;
+
+	if (data->fh[STDOUT_FILENO] >= 0) {
+		FD_SET(data->fh[STDOUT_FILENO], &rfds);
+		++num;
+	}
+	if (data->fh[STDERR_FILENO] >= 0) {
+		FD_SET(data->fh[STDERR_FILENO], &rfds);
+		++num;
+	}
+
+	if (num == 0) {
+		/*
+		 * There are no open handles for reading
+		 */
+		errno = EBADF;
+		return -1;
+	}
+
+	struct timeval tv = {0,0};
+	int max_h = MAX(data->fh[STDOUT_FILENO],
+			data->fh[STDERR_FILENO]);
+
+	errno = 0;
+	int retv = select(max_h + 1, &rfds, NULL, NULL, &tv);
+	switch (retv) {
+	case -1:	/* Error */
+		return -1;
+	case 0:		/* Not ready yet */
+		return -2;
+	case 1: {        /* One socket is ready */
+
+		/* Choose the socket */
+		int fno = STDOUT_FILENO;
+		if (!FD_ISSET(data->fh[fno], &rfds))
+			fno = STDERR_FILENO;
+		if (!FD_ISSET(data->fh[fno], &rfds)) {
+			unreachable();
+			return -1;
+		}
+
+		received = read(data->fh[fno], buf, count);
+		if (received < 0)
+			goto on_error;
+		data->prev_source = fno;
+		*read_bytes = received;
+		*source_id = fno;
+		return 0;
+		}
+	case 2: {        /* Both sockets are ready */
+		received = read(get_handle_in_order(data), buf, count);
+		if (received < 0)
+			goto on_error;
+		*read_bytes = received;
+		*source_id = (int)data->prev_source;
+		return 0;
+		}
+	}
+
+	unreachable();
+	return -1;
+
+on_error:
+	if (errno == EINTR) {
+		*read_bytes = 0;
+		return -2;	/* Repeat */
+	} else
+		return -1;	/* Error */
+}
+
+int
+coio_popen_try_to_write(void *fh, const void *buf, size_t count,
+	size_t *written)
+{
+	if (count == 0)
+		return 0;
+
+	struct popen_data *data = (struct popen_data *)fh;
+
+	if (data == NULL){
+		errno = EBADF;
+		return -1;
+	}
+
+	if (data->fh[STDIN_FILENO] < 0) {
+		/*
+		 * There are no open handles for writing
+		 */
+		errno = EBADF;
+		return -1;
+	}
+
+	fd_set wfds;
+	FD_ZERO(&wfds);
+
+	int wh = data->fh[STDIN_FILENO];
+	FD_SET(wh, &wfds);
+
+	struct timeval tv = {0,0};
+
+	int retv = select(wh + 1, NULL, &wfds, NULL, &tv);
+	if (retv < 0)
+		goto on_error;
+	else if (retv == 0)
+		return -2;	/* Not ready yet */
+
+	assert(retv == 1);	/* The socket is ready */
+
+	if (FD_ISSET(wh, &wfds)) {
+		ssize_t rc = write(wh, buf, count);
+		if (rc < 0)
+			goto on_error;
+		*written = rc;
+		return (*written == count) ? 0
+					   : -2;
+	}
+
+	unreachable();
+	return -1;
+
+on_error:
+	if (errno == EINTR) {
+		*written = 0;
+		return -2;	/* Repeat */
+	} else
+		return -1;	/* Error */
+}
\ No newline at end of file
diff --git a/src/lib/core/coio_popen.h b/src/lib/core/coio_popen.h
new file mode 100644
index 000000000..732c2b1f4
--- /dev/null
+++ b/src/lib/core/coio_popen.h
@@ -0,0 +1,123 @@
+#ifndef TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED
+#define TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+ * Implementation of fio.popen.
+ * The function opens a process by creating a pipe,
+ * forking, and invoking the shell.
+ *
+ * @param command pointer to a null-terminated string
+ * containing a shell command line.
+ * This command is passed to /bin/sh using the -c flag;
+ *
+ * @param type pointer to a null-terminated string
+ * which must contain either the letter 'r' for reading
+ * or the letter 'w' for writing.
+ *
+ * @return handle of the pipe for reading or writing
+ * (depends on value of type).
+ * In a case of error returns NULL.
+ */
+void *
+coio_popen_impl(const char *command, const char *type);
+
+/**
+ * Implementation of fio.pclose.
+ * The function tries to retrieve status of
+ * the associated process.
+ * If the associated process is terminated then releases
+ * allocated resources.
+ * If the associated process is still running the function
+ * returns immediately. In this case repeat the call.
+ *
+ * @param fh handle returned by fio.popen.
+ *
+ * @return 0 if the process is terminated
+ * @return -1 for an error
+ * @return -2 if the process is still running
+ */
+int
+coio_try_pclose_impl(void *fh);
+
+/**
+ * The function reads up to count bytes from the handle
+ * associated with the child process.
+ * Returns immediately
+ *
+ * @param fd handle returned by fio.popen.
+ * @param buf a buffer to be read into
+ * @param count size of buffer in bytes
+ * @param read_bytes A pointer to the
+ * variable that receives the number of bytes read.
+ * @param source_id A pointer to the variable that receives a
+ * source stream id, 1 - for STDOUT, 2 - for STDERR.
+ *
+ * @return 0 data were successfully read
+ * @return -1 an error occurred, see errno for error code
+ * @return -2 there is nothing to read yet
+ */
+int
+coio_popen_try_to_read(void *fh, void *buf, size_t count,
+	size_t *read_bytes, int *source_id);
+
+/**
+ * The function writes up to count bytes to the handle
+ * associated with the child process.
+ * Tries to write as much as possible without blocking
+ * and immediately returns.
+ *
+ * @param fd handle returned by fio.popen.
+ * @param buf a buffer to be written from
+ * @param count size of buffer in bytes
+ * @param written A pointer to the
+ * variable that receives the number of bytes actually written.
+ * If function fails the number of written bytes is undefined.
+ *
+ * @return 0 all data were successfully written
+ * @return -1 an error occurred, see errno for error code
+ * @return -2 the writing can block
+ */
+int
+coio_popen_try_to_write(void *fh, const void *buf, size_t count,
+	size_t *written);
+
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED */
diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
index 89fb04284..79f830555 100644
--- a/src/lib/core/fiber.h
+++ b/src/lib/core/fiber.h
@@ -488,8 +488,8 @@ struct cord {
 extern __thread struct cord *cord_ptr;
 
 #define cord() cord_ptr
-#define fiber() cord()->fiber
-#define loop() (cord()->loop)
+#define fiber() (cord() ? cord()->fiber : NULL)
+#define loop() (cord() ? cord()->loop : NULL)
 
 void
 cord_create(struct cord *cord, const char *name);
diff --git a/src/lua/fio.c b/src/lua/fio.c
index 806f4256b..65481edc0 100644
--- a/src/lua/fio.c
+++ b/src/lua/fio.c
@@ -703,7 +703,89 @@ lbox_fio_copyfile(struct lua_State *L)
 	return lbox_fio_pushbool(L, coio_copyfile(source, dest) == 0);
 }
 
+static int
+lbox_fio_popen(struct lua_State *L)
+{
+	if (lua_gettop(L) < 1) {
+		usage:
+		luaL_error(L, "Usage: fio.popen(command, type)");
+	}
+
+	/*
+	 * FILE *popen(const char *command, const char *type);
+	 */
+	const char *command = lua_tostring(L, 1);
+	if (command == NULL)
+		goto usage;
+
+	const char *type = lua_tostring(L, 2);
+	if (type == NULL)
+		type = "r";
+
+	void* fh = coio_popen(command, type);
+	if (fh == NULL) {
+		lua_pushnil(L);
+		lbox_fio_pushsyserror(L);
+		return 2;
+	}
+	lua_pushlightuserdata(L, fh);
+	return 1;
+}
 
+static int
+lbox_fio_pclose(struct lua_State *L)
+{
+	void* fd = lua_touserdata(L, 1);
+	return lbox_fio_pushbool(L, coio_pclose(fd) == 0);
+}
+
+static int
+lbox_fio_popen_read(struct lua_State *L)
+{
+	void* fh = lua_touserdata(L, 1);
+	uint32_t ctypeid;
+	char *buf = *(char **)luaL_checkcdata(L, 2, &ctypeid);
+	size_t len = lua_tonumber(L, 3);
+
+	if (!len) {
+		lua_pushinteger(L, 0);
+		return 1;
+	}
+
+	int output_number = 0;
+	int res = coio_popen_read(fh, buf, len, &output_number);
+
+	if (res < 0) {
+		lua_pushnil(L);
+		lua_pushinteger(L, 0);
+		lbox_fio_pushsyserror(L);
+		return 3;
+	}
+
+	lua_pushinteger(L, res);
+	lua_pushinteger(L, output_number);
+	return 2;
+}
+
+static int
+lbox_fio_popen_write(struct lua_State *L)
+{
+	void* fh = lua_touserdata(L, 1);
+	const char *buf = lua_tostring(L, 2);
+	uint32_t ctypeid = 0;
+	if (buf == NULL)
+		buf = *(const char **)luaL_checkcdata(L, 2, &ctypeid);
+	size_t len = lua_tonumber(L, 3);
+
+	int res = coio_popen_write(fh, buf, len);
+	if (res < 0) {
+		lua_pushnil(L);
+		lbox_fio_pushsyserror(L);
+		return 2;
+	}
+	lua_pushinteger(L, res);
+	return 1;
+}
 
 void
 tarantool_lua_fio_init(struct lua_State *L)
@@ -747,6 +829,10 @@ tarantool_lua_fio_init(struct lua_State *L)
 		{ "listdir",		lbox_fio_listdir		},
 		{ "fstat",		lbox_fio_fstat			},
 		{ "copyfile",		lbox_fio_copyfile,		},
+		{ "popen",		lbox_fio_popen			},
+		{ "pclose",		lbox_fio_pclose 		},
+		{ "popen_read",		lbox_fio_popen_read 		},
+		{ "popen_write",	lbox_fio_popen_write 		},
 		{ NULL,			NULL				}
 	};
 	luaL_register(L, NULL, internal_methods);
diff --git a/src/lua/fio.lua b/src/lua/fio.lua
index 38664a556..ed3a6c781 100644
--- a/src/lua/fio.lua
+++ b/src/lua/fio.lua
@@ -206,6 +206,86 @@ fio.open = function(path, flags, mode)
     return fh
 end
 
+local popen_methods = {}
+
+-- read stdout & stderr of the process started by fio.popen
+-- read() -> str, source
+-- read(buf) -> len, source
+-- read(size) -> str, source
+-- read(buf, size) -> len, source
+-- source contains id of the stream,
+-- 1 - for stdout, 2 - for stderr, 0 - in a case of error
+popen_methods.read = function(self, buf, size)
+    local tmpbuf
+    if (not ffi.istype(const_char_ptr_t, buf) and buf == nil) or
+            (ffi.istype(const_char_ptr_t, buf) and size == nil) then
+        size = 512
+    end
+    if not ffi.istype(const_char_ptr_t, buf) then
+        size = buf or size
+        tmpbuf = buffer.ibuf()
+        buf = tmpbuf:reserve(size)
+    end
+    local res, output_no, err = internal.popen_read(self.fh, buf, size)
+    if res == nil then
+        if tmpbuf ~= nil then
+            tmpbuf:recycle()
+        end
+        return nil, output_no, err
+    end
+    if tmpbuf ~= nil then
+        tmpbuf:alloc(res)
+        res = ffi.string(tmpbuf.rpos, tmpbuf:size())
+        tmpbuf:recycle()
+    end
+    return res, output_no
+end
+
+-- write(str)
+-- write(buf, len)
+popen_methods.write = function(self, data, len)
+    if not ffi.istype(const_char_ptr_t, data) then
+        data = tostring(data)
+        len = #data
+    end
+    local res, err = internal.popen_write(self.fh, data, len)
+    if err ~= nil then
+        return false, err
+    end
+    return res >= 0
+end
+
+popen_methods.close = function(self)
+    local res, err = internal.pclose(self.fh)
+    self.fh = -1
+    if err ~= nil then
+        return false, err
+    end
+    return res
+end
+
+
+local popen_mt = { __index = popen_methods }
+
+fio.popen = function(command, mode)
+    if type(command) ~= 'string' then
+        error("Usage: fio.popen(command, mode)")
+    end
+
+    if type(mode) ~= 'string' then
+        mode = 'r'  -- use default read-mode
+    end
+
+    local fh,err = internal.popen(tostring(command), tostring(mode))
+    if err ~= nil then
+        return nil, err
+    end
+
+    fh = {fh = fh}
+    setmetatable(fh, popen_mt)
+    return fh
+end
+
 fio.pathjoin = function(...)
     local i, path = 1, nil
 
diff --git a/test/box-tap/fio_popen.test.lua b/test/box-tap/fio_popen.test.lua
new file mode 100755
index 000000000..6f984b51b
--- /dev/null
+++ b/test/box-tap/fio_popen.test.lua
@@ -0,0 +1,79 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local fio = require('fio')
+local test = tap.test()
+
+test:plan(2 * 3)
+
+box.cfg{}
+
+local function start_app(script_path)
+    local fh = fio.popen(script_path, "r")
+
+    if fh == nil then
+        local err = errno.strerror()
+        error(string.format("Failed to run app: %s, error: %s", script_path, err))
+        return nil
+    else
+        return fh
+    end
+end
+
+function read_stdout(fh)
+    local ss = ""
+
+    local s,src = fh:read(64)
+
+    while s ~= nil and s ~= "" do
+        if src == nil then
+            src = -1
+            print("the source is undefined")
+        else
+            ss = ss .. string.format("[%d] %s", src, s)
+        end
+
+        s,src = fh:read(64)
+    end
+
+    return ss
+end
+
+local build_path = os.getenv("BUILDDIR")
+local app_path = fio.pathjoin(build_path, 'test/box-tap/fio_popen_test1.sh')
+
+local the_app = start_app(app_path)
+test:ok(the_app ~= nil, "starting a child process 1")
+
+local app_output = read_stdout(the_app)
+test:ok(app_output ~= nil, "response from child process 1")
+
+local expected_output = "[1] 1\n[1] 2\n[1] 3\n[1] 4\n[1] 5\n[1] 6\n[1] 7\n[1] 8\n[1] 9\n[1] 10\n"
+
+print("Expected:\n" .. expected_output)
+print("\nReceived:\n" .. app_output)
+
+test:ok(app_output == expected_output, "compare response 1")
+
+the_app:close()
+
+-- Try to get STDERR output
+the_app = start_app("/fufel/fufel.sh")
+test:ok(the_app ~= nil, "starting a not existing process")
+
+app_output = read_stdout(the_app)
+test:ok(app_output ~= nil, "response from child process 2")
+
+local expected_output = "[2] sh: 1: /fufel/fufel.sh: not found\n"
+
+print("Expected:\n" .. expected_output)
+print("\nReceived:\n" .. app_output)
+
+test:ok(app_output == expected_output, "compare response 2")
+
+the_app:close()
+
+
+test:check()
+os.exit(0)
+
diff --git a/test/box-tap/fio_popen_test1.sh b/test/box-tap/fio_popen_test1.sh
new file mode 100755
index 000000000..becf83e6d
--- /dev/null
+++ b/test/box-tap/fio_popen_test1.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+for i in {1..10}
+do
+	echo $i
+	sleep 1
+done
+
diff --git a/third_party/libeio/eio.c b/third_party/libeio/eio.c
index 7351d5dda..e433b1b3b 100644
--- a/third_party/libeio/eio.c
+++ b/third_party/libeio/eio.c
@@ -1741,7 +1741,24 @@ static int eio_threads;
 static void ecb_cold
 eio_prefork()
 {
-    eio_threads = etp_set_max_parallel(EIO_POOL, 0);
+	/*
+	 * When fork() is called libeio shuts
+	 * down all working threads.
+	 * But it causes a deadlock if fork() was
+	 * called from the libeio thread.
+	 * To avoid this do not close the
+	 * thread who called fork().
+	 * This behaviour is acceptable for the
+	 * case when fork() is immediately followed
+	 * by exec().
+	 * To clone a process call fork() from the
+	 * main thread.
+	 */
+
+	if (etp_is_in_pool_thread())
+		eio_threads = etp_get_max_parallel(EIO_POOL);
+	else
+    		eio_threads = etp_set_max_parallel(EIO_POOL, 0);
 }
 
 static void ecb_cold
diff --git a/third_party/libeio/etp.c b/third_party/libeio/etp.c
index 42f7661eb..38c2d1f9b 100644
--- a/third_party/libeio/etp.c
+++ b/third_party/libeio/etp.c
@@ -164,6 +164,21 @@ struct etp_pool_user
    xmutex_t lock;
 };
 
+static __thread int is_eio_thread = 0;
+
+/**
+ * Check whether the current thread belongs to
+ * libeio thread pool or just a generic thread.
+ *
+ * @return 0 for generic thread
+ * @return 1 for libeio thread pool
+ **/
+ETP_API_DECL int ecb_cold
+etp_is_in_pool_thread()
+{
+	return is_eio_thread;
+}
+
 /* worker threads management */
 
 static void ecb_cold
@@ -322,6 +337,9 @@ X_THREAD_PROC (etp_proc)
   self.pool = pool;
   etp_pool_user user; /* per request */
 
+/* Distinguish libeio threads from the generic threads */
+  is_eio_thread = 1;
+
   etp_proc_init ();
 
   /* try to distribute timeouts somewhat evenly (nanosecond part) */
@@ -616,3 +634,13 @@ etp_set_max_parallel (etp_pool pool, unsigned int threads)
   X_UNLOCK (pool->lock);
   return retval;
 }
+
+ETP_API_DECL int ecb_cold
+etp_get_max_parallel (etp_pool pool)
+{
+	int retval;
+	X_LOCK   (pool->lock);
+	retval = pool->wanted;
+	X_UNLOCK (pool->lock);
+	return retval;
+}
diff --git a/third_party/libev/ev.c b/third_party/libev/ev.c
index 6a2648591..5fa8293a1 100644
--- a/third_party/libev/ev.c
+++ b/third_party/libev/ev.c
@@ -4214,6 +4214,8 @@ noinline
 void
 ev_signal_stop (EV_P_ ev_signal *w) EV_THROW
 {
+	if (!loop)
+		return;
   clear_pending (EV_A_ (W)w);
   if (expect_false (!ev_is_active (w)))
     return;
-- 
2.17.1

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2019-05-07 10:31 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-04-24 17:00 [tarantool-patches] [PATCH] core: Non-blocking io.popen Stanislav Zudin
2019-05-07  8:49 ` Vladimir Davydov
2019-05-07 10:31   ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox