* [tarantool-patches] [PATCH] core: Non-blocking io.popen
@ 2019-04-24 17:00 Stanislav Zudin
2019-05-07 8:49 ` Vladimir Davydov
0 siblings, 1 reply; 3+ messages in thread
From: Stanislav Zudin @ 2019-04-24 17:00 UTC (permalink / raw)
To: tarantool-patches; +Cc: Stanislav Zudin
Adds nonblocking implementation of popen.
The method is available in namespace fio.
fio.popen() returns an object with three methods:
read(), write() and close().
Method read() reads both STDOUT and STDERR output.
Closes #4031
---
Branch: https://github.com/tarantool/tarantool/tree/stanztt/gh-4031-nonblocking-popen
Issue: https://github.com/tarantool/tarantool/issues/4031
src/lib/core/CMakeLists.txt | 1 +
src/lib/core/coio_file.c | 189 +++++++++++++++++
src/lib/core/coio_file.h | 13 ++
src/lib/core/coio_popen.c | 366 ++++++++++++++++++++++++++++++++
src/lib/core/coio_popen.h | 123 +++++++++++
src/lib/core/fiber.h | 4 +-
src/lua/fio.c | 86 ++++++++
src/lua/fio.lua | 80 +++++++
test/box-tap/fio_popen.test.lua | 79 +++++++
test/box-tap/fio_popen_test1.sh | 7 +
third_party/libeio/eio.c | 19 +-
third_party/libeio/etp.c | 28 +++
third_party/libev/ev.c | 2 +
13 files changed, 994 insertions(+), 3 deletions(-)
create mode 100644 src/lib/core/coio_popen.c
create mode 100644 src/lib/core/coio_popen.h
create mode 100755 test/box-tap/fio_popen.test.lua
create mode 100755 test/box-tap/fio_popen_test1.sh
diff --git a/src/lib/core/CMakeLists.txt b/src/lib/core/CMakeLists.txt
index eb10b11c3..8b1f8d32e 100644
--- a/src/lib/core/CMakeLists.txt
+++ b/src/lib/core/CMakeLists.txt
@@ -26,6 +26,7 @@ set(core_sources
trigger.cc
mpstream.c
port.c
+ coio_popen.c
)
if (TARGET_OS_NETBSD)
diff --git a/src/lib/core/coio_file.c b/src/lib/core/coio_file.c
index c5b2db781..ffe9d634c 100644
--- a/src/lib/core/coio_file.c
+++ b/src/lib/core/coio_file.c
@@ -33,6 +33,7 @@
#include "fiber.h"
#include "say.h"
#include "fio.h"
+#include "coio_popen.h"
#include <stdio.h>
#include <stdlib.h>
#include <dirent.h>
@@ -102,6 +103,31 @@ struct coio_file_task {
const char *source;
const char *dest;
} copyfile;
+
+ struct {
+ const char *command;
+ const char *type;
+ void *handle;
+ } popen_params;
+
+ struct {
+ void *handle;
+ } pclose_params;
+
+ struct {
+ void *handle;
+ const void* buf;
+ size_t count;
+ size_t *written;
+ } popen_write;
+
+ struct {
+ void *handle;
+ void *buf;
+ size_t count;
+ size_t *read_bytes;
+ int *output_number;
+ } popen_read;
};
};
@@ -628,3 +654,166 @@ coio_copyfile(const char *source, const char *dest)
eio_req *req = eio_custom(coio_do_copyfile, 0, coio_complete, &eio);
return coio_wait_done(req, &eio);
}
+
+static void
+coio_do_popen(eio_req *req)
+{
+ struct coio_file_task *eio = (struct coio_file_task *)req->data;
+ eio->popen_params.handle = coio_popen_impl(eio->popen_params.command,
+ eio->popen_params.type);
+ eio->result = 0;
+ eio->errorno = errno;
+}
+
+void *
+coio_popen(const char *command, const char *type)
+{
+ INIT_COEIO_FILE(eio);
+ eio.popen_params.command = command;
+ eio.popen_params.type = type;
+
+ eio_req *req = eio_custom(coio_do_popen, 0,
+ coio_complete, &eio);
+ coio_wait_done(req, &eio);
+ return eio.popen_params.handle;
+}
+
+static void
+coio_do_pclose(eio_req *req)
+{
+ struct coio_file_task *eio = (struct coio_file_task *)req->data;
+ req->result = coio_try_pclose_impl(eio->pclose_params.handle);
+ req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_pclose(void *fh)
+{
+ INIT_COEIO_FILE(eio);
+ eio.pclose_params.handle = fh;
+ eio_req *req = eio_custom(coio_do_pclose, 0,
+ coio_complete, &eio);
+ return coio_wait_done(req, &eio);
+}
+
+int
+coio_pclose(void *fh)
+{
+ int rc = coio_try_pclose_impl(fh);
+ if (rc == 0) /* The child process is dead */
+ return 0;
+ else if (rc == -1) /* Failed */
+ return -1;
+
+ assert(rc == -2); /* A blocking operation is expected */
+
+ do {
+ rc = coio_do_nonblock_pclose(fh);
+ } while (rc == -2);
+
+ return rc;
+}
+
+static void
+coio_do_popen_read(eio_req *req)
+{
+ struct coio_file_task *eio = (struct coio_file_task *)req->data;
+
+ int rc = coio_popen_try_to_read(eio->popen_read.handle,
+ eio->popen_read.buf,
+ eio->popen_read.count,
+ eio->popen_read.read_bytes,
+ eio->popen_read.output_number);
+
+ req->result = rc;
+ req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_popen_read(void *fh, void *buf, size_t count,
+ size_t *read_bytes, int *source_id)
+{
+ INIT_COEIO_FILE(eio);
+ eio.popen_read.buf = buf;
+ eio.popen_read.count = count;
+ eio.popen_read.handle = fh;
+ eio.popen_read.read_bytes = read_bytes;
+ eio.popen_read.output_number = source_id;
+ eio_req *req = eio_custom(coio_do_popen_read, 0,
+ coio_complete, &eio);
+ return coio_wait_done(req, &eio);
+}
+
+ssize_t
+coio_popen_read(void *fh, void *buf, size_t count, int *output_number)
+{
+ size_t received = 0;
+ int rc = coio_popen_try_to_read(fh, buf, count,
+ &received, output_number);
+ if (rc == 0) /* The reading's succeeded */
+ return (ssize_t)received;
+ else if (rc == -1) /* Failed */
+ return -1;
+
+ assert(rc == -2); /* A blocking operation is expected */
+
+ do {
+ rc = coio_do_nonblock_popen_read(fh, buf, count,
+ &received, output_number);
+ } while (rc == -2);
+
+ return (rc == 0) ? (ssize_t)received
+ : -1;
+}
+
+static void
+coio_do_popen_write(eio_req *req)
+{
+ struct coio_file_task *eio = (struct coio_file_task *)req->data;
+
+ int rc = coio_popen_try_to_write(eio->popen_write.handle,
+ eio->popen_write.buf,
+ eio->popen_write.count,
+ eio->popen_write.written);
+
+ req->result = rc;
+ req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_popen_write(void *fh, const void *buf, size_t count,
+ size_t *written)
+{
+ INIT_COEIO_FILE(eio);
+ eio.popen_write.buf = buf;
+ eio.popen_write.count = count;
+ eio.popen_write.handle = fh;
+ eio.popen_write.written = written;
+ eio_req *req = eio_custom(coio_do_popen_write, 0,
+ coio_complete, &eio);
+ return coio_wait_done(req, &eio);
+}
+
+ssize_t
+coio_popen_write(void *fh, const void *buf, size_t count)
+{
+ size_t written = 0;
+ int rc = coio_popen_try_to_write(fh, buf, count,
+ &written);
+ if (rc == 0) /* The writing's succeeded */
+ return (ssize_t)written;
+ else if (rc == -1) /* Failed */
+ return -1;
+
+ assert(rc == -2); /* A blocking operation is expected */
+
+ do {
+ buf += written; /* advance writing position */
+ count -= written;
+ rc = coio_do_nonblock_popen_write(fh, buf, count,
+ &written);
+ } while (rc == -2);
+
+ return (rc == 0) ? (ssize_t)written
+ : -1;
+}
diff --git a/src/lib/core/coio_file.h b/src/lib/core/coio_file.h
index f2112ceed..4089c1a05 100644
--- a/src/lib/core/coio_file.h
+++ b/src/lib/core/coio_file.h
@@ -84,6 +84,19 @@ int coio_tempdir(char *path, size_t path_len);
int coio_readdir(const char *path, char **buf);
int coio_copyfile(const char *source, const char *dest);
+
+void *
+coio_popen(const char *command, const char *type);
+
+int
+coio_pclose(void *fd);
+
+ssize_t
+coio_popen_read(void *fh, void *buf, size_t count, int *output_number);
+
+ssize_t
+coio_popen_write(void *fh, const void *buf, size_t count);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/lib/core/coio_popen.c b/src/lib/core/coio_popen.c
new file mode 100644
index 000000000..57b51b6c5
--- /dev/null
+++ b/src/lib/core/coio_popen.c
@@ -0,0 +1,366 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stddef.h>
+#include "coio_popen.h"
+#include "coio_task.h"
+#include "fiber.h"
+#include "say.h"
+#include "fio.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <dirent.h>
+#include <wait.h>
+#include <paths.h>
+#include <unistd.h>
+#include <sys/types.h> /* See NOTES */
+#include <sys/socket.h>
+
+struct popen_data {
+ /* process id */
+ pid_t pid;
+ int fh[3];
+ /*
+ * Three handles:
+ * [0] write to stdin of the child process
+ * [1] read from stdout of the child process
+ * [2] read from stderr of the child process
+ */
+
+ /* The ID of socket was read recently
+ * (STDERR_FILENO or STDOUT_FILENO */
+ int prev_source;
+};
+
+/*
+ * Returns next socket to read.
+ * Use this function when both STDOUT and STDERR outputs
+ * are ready for reading.
+ * */
+static inline int
+get_handle_in_order(struct popen_data *data)
+{
+ /*
+ * Invert the order of handles to be read
+ */
+ const int mask = STDERR_FILENO | STDOUT_FILENO;
+ data->prev_source ^= mask;
+
+ /*
+ * If handle is not available, invert it back
+ */
+ if (data->fh[data->prev_source] < 0)
+ data->prev_source ^= mask;
+ /*
+ * if both reading handles are invalid return -1
+ */
+ return data->fh[data->prev_source];
+}
+
+static struct popen_data *
+popen_data_new()
+{
+ struct popen_data *data =
+ (struct popen_data *)calloc(1, sizeof(*data));
+ data->fh[0] = -1;
+ data->fh[1] = -1;
+ data->fh[2] = -1;
+ data->prev_source = STDERR_FILENO;
+ /*
+ * if both streams are ready then
+ * start reading from STDOUT
+ */
+ return data;
+}
+
+void *
+coio_popen_impl(const char *command, const char *type)
+{
+ pid_t pid;
+ int socket_rw[2] = {-1,-1};
+ int socket_err[2] = {-1,-1};
+ errno = 0;
+
+ char *argv[] = {"sh", "-c", NULL, NULL};
+ argv[2] = (char *)command;
+
+ if ((*type != 'r' && *type != 'w') || type[1] != '\0') {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ struct popen_data *data = popen_data_new();
+ if (data == NULL)
+ return NULL;
+
+ const int sock_type = SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC;
+ if (socketpair(AF_UNIX, sock_type, 0, socket_rw) < 0) {
+ goto on_error;
+ }
+ if (socketpair(AF_UNIX, sock_type, 0, socket_err) < 0) {
+ goto on_error;
+ }
+
+ pid = fork();
+
+ if (pid < 0)
+ goto on_error;
+ else if (pid == 0) /* child */ {
+ /* Setup stdin/stdout */
+ close(socket_rw[0]);
+ int fno = (*type == 'r') ? STDOUT_FILENO
+ : STDIN_FILENO;
+ if (socket_rw[1] != fno) {
+ dup2(socket_rw[1], fno);
+ close(socket_rw[1]);
+ }
+
+ /* setup stderr */
+ close(socket_err[0]);
+ if (socket_err[1] != STDERR_FILENO) {
+ dup2(socket_err[1], STDERR_FILENO);
+ close(socket_err[1]);
+ }
+
+ execve(_PATH_BSHELL, argv, environ);
+ _exit(127);
+ unreachable();
+ }
+
+ /* parent process */
+ close(socket_rw[1]);
+ close(socket_err[1]);
+
+ if (*type == 'r')
+ data->fh[STDOUT_FILENO] = socket_rw[0];
+ else
+ data->fh[STDIN_FILENO] = socket_rw[0];
+
+ data->fh[STDERR_FILENO] = socket_err[0];
+ data->pid = pid;
+
+ return data;
+
+on_error:
+ if (data)
+ free(data);
+ if (socket_rw[0] >= 0) {
+ close(socket_rw[0]);
+ close(socket_rw[1]);
+ }
+ if (socket_err[0] >= 0) {
+ close(socket_err[0]);
+ close(socket_err[1]);
+ }
+ return NULL;
+}
+
+int
+coio_try_pclose_impl(void *fh)
+{
+ struct popen_data *data = (struct popen_data *)fh;
+
+ if (data == NULL){
+ errno = EBADF;
+ return -1;
+ }
+
+ /* Close all handles */
+ for(int i = 0; i < 3; ++i) {
+ if (data->fh[i] >= 0) {
+ close(data->fh[i]);
+ data->fh[i] = -1;
+ }
+ }
+
+ int pstat;
+ pid_t pid = waitpid(data->pid, &pstat, WNOHANG);
+
+ int rc = 0;
+
+ if (pid == 0)
+ return -2; /* Process is still running */
+ else if (pid < 0) {
+ if (errno == ECHILD)
+ rc = 0; /* Child process is not found
+ * (may be is already dead)
+ */
+ else if (errno == EINTR)
+ return -2; /* Retry */
+ else
+ rc = -1; /* An error occurred */
+ }
+
+ free(data);
+ return rc;
+}
+
+int
+coio_popen_try_to_read(void *fh, void *buf, size_t count,
+ size_t *read_bytes, int *source_id)
+{
+ struct popen_data *data = (struct popen_data *)fh;
+
+ if (data == NULL){
+ errno = EBADF;
+ return -1;
+ }
+
+ fd_set rfds;
+ FD_ZERO(&rfds);
+ ssize_t received = 0;
+ int num = 0;
+
+ if (data->fh[STDOUT_FILENO] >= 0) {
+ FD_SET(data->fh[STDOUT_FILENO], &rfds);
+ ++num;
+ }
+ if (data->fh[STDERR_FILENO] >= 0) {
+ FD_SET(data->fh[STDERR_FILENO], &rfds);
+ ++num;
+ }
+
+ if (num == 0) {
+ /*
+ * There are no open handles for reading
+ */
+ errno = EBADF;
+ return -1;
+ }
+
+ struct timeval tv = {0,0};
+ int max_h = MAX(data->fh[STDOUT_FILENO],
+ data->fh[STDERR_FILENO]);
+
+ errno = 0;
+ int retv = select(max_h + 1, &rfds, NULL, NULL, &tv);
+ switch (retv) {
+ case -1: /* Error */
+ return -1;
+ case 0: /* Not ready yet */
+ return -2;
+ case 1: { /* One socket is ready */
+
+ /* Choose the socket */
+ int fno = STDOUT_FILENO;
+ if (!FD_ISSET(data->fh[fno], &rfds))
+ fno = STDERR_FILENO;
+ if (!FD_ISSET(data->fh[fno], &rfds)) {
+ unreachable();
+ return -1;
+ }
+
+ received = read(data->fh[fno], buf, count);
+ if (received < 0)
+ goto on_error;
+ data->prev_source = fno;
+ *read_bytes = received;
+ *source_id = fno;
+ return 0;
+ }
+ case 2: { /* Both sockets are ready */
+ received = read(get_handle_in_order(data), buf, count);
+ if (received < 0)
+ goto on_error;
+ *read_bytes = received;
+ *source_id = (int)data->prev_source;
+ return 0;
+ }
+ }
+
+ unreachable();
+ return -1;
+
+on_error:
+ if (errno == EINTR) {
+ *read_bytes = 0;
+ return -2; /* Repeat */
+ } else
+ return -1; /* Error */
+}
+
+int
+coio_popen_try_to_write(void *fh, const void *buf, size_t count,
+ size_t *written)
+{
+ if (count == 0)
+ return 0;
+
+ struct popen_data *data = (struct popen_data *)fh;
+
+ if (data == NULL){
+ errno = EBADF;
+ return -1;
+ }
+
+ if (data->fh[STDIN_FILENO] < 0) {
+ /*
+ * There are no open handles for writing
+ */
+ errno = EBADF;
+ return -1;
+ }
+
+ fd_set wfds;
+ FD_ZERO(&wfds);
+
+ int wh = data->fh[STDIN_FILENO];
+ FD_SET(wh, &wfds);
+
+ struct timeval tv = {0,0};
+
+ int retv = select(wh + 1, NULL, &wfds, NULL, &tv);
+ if (retv < 0)
+ goto on_error;
+ else if (retv == 0)
+ return -2; /* Not ready yet */
+
+ assert(retv == 1); /* The socket is ready */
+
+ if (FD_ISSET(wh, &wfds)) {
+ ssize_t rc = write(wh, buf, count);
+ if (rc < 0)
+ goto on_error;
+ *written = rc;
+ return (*written == count) ? 0
+ : -2;
+ }
+
+ unreachable();
+ return -1;
+
+on_error:
+ if (errno == EINTR) {
+ *written = 0;
+ return -2; /* Repeat */
+ } else
+ return -1; /* Error */
+}
\ No newline at end of file
diff --git a/src/lib/core/coio_popen.h b/src/lib/core/coio_popen.h
new file mode 100644
index 000000000..732c2b1f4
--- /dev/null
+++ b/src/lib/core/coio_popen.h
@@ -0,0 +1,123 @@
+#ifndef TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED
+#define TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+ * Implementation of fio.popen.
+ * The function opens a process by creating a pipe,
+ * forking, and invoking the shell.
+ *
+ * @param command pointer to a null-terminated string
+ * containing a shell command line.
+ * This command is passed to /bin/sh using the -c flag;
+ *
+ * @param type pointer to a null-terminated string
+ * which must contain either the letter 'r' for reading
+ * or the letter 'w' for writing.
+ *
+ * @return handle of the pipe for reading or writing
+ * (depends on value of type).
+ * In a case of error returns NULL.
+ */
+void *
+coio_popen_impl(const char *command, const char *type);
+
+/**
+ * Implementation of fio.pclose.
+ * The function tries to retrieve status of
+ * the associated process.
+ * If the associated process is terminated then releases
+ * allocated resources.
+ * If the associated process is still running the function
+ * returns immediately. In this case repeat the call.
+ *
+ * @param fh handle returned by fio.popen.
+ *
+ * @return 0 if the process is terminated
+ * @return -1 for an error
+ * @return -2 if the process is still running
+ */
+int
+coio_try_pclose_impl(void *fh);
+
+/**
+ * The function reads up to count bytes from the handle
+ * associated with the child process.
+ * Returns immediately
+ *
+ * @param fd handle returned by fio.popen.
+ * @param buf a buffer to be read into
+ * @param count size of buffer in bytes
+ * @param read_bytes A pointer to the
+ * variable that receives the number of bytes read.
+ * @param source_id A pointer to the variable that receives a
+ * source stream id, 1 - for STDOUT, 2 - for STDERR.
+ *
+ * @return 0 data were successfully read
+ * @return -1 an error occurred, see errno for error code
+ * @return -2 there is nothing to read yet
+ */
+int
+coio_popen_try_to_read(void *fh, void *buf, size_t count,
+ size_t *read_bytes, int *source_id);
+
+/**
+ * The function writes up to count bytes to the handle
+ * associated with the child process.
+ * Tries to write as much as possible without blocking
+ * and immediately returns.
+ *
+ * @param fd handle returned by fio.popen.
+ * @param buf a buffer to be written from
+ * @param count size of buffer in bytes
+ * @param written A pointer to the
+ * variable that receives the number of bytes actually written.
+ * If function fails the number of written bytes is undefined.
+ *
+ * @return 0 all data were successfully written
+ * @return -1 an error occurred, see errno for error code
+ * @return -2 the writing can block
+ */
+int
+coio_popen_try_to_write(void *fh, const void *buf, size_t count,
+ size_t *written);
+
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED */
diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
index 89fb04284..79f830555 100644
--- a/src/lib/core/fiber.h
+++ b/src/lib/core/fiber.h
@@ -488,8 +488,8 @@ struct cord {
extern __thread struct cord *cord_ptr;
#define cord() cord_ptr
-#define fiber() cord()->fiber
-#define loop() (cord()->loop)
+#define fiber() (cord() ? cord()->fiber : NULL)
+#define loop() (cord() ? cord()->loop : NULL)
void
cord_create(struct cord *cord, const char *name);
diff --git a/src/lua/fio.c b/src/lua/fio.c
index 806f4256b..65481edc0 100644
--- a/src/lua/fio.c
+++ b/src/lua/fio.c
@@ -703,7 +703,89 @@ lbox_fio_copyfile(struct lua_State *L)
return lbox_fio_pushbool(L, coio_copyfile(source, dest) == 0);
}
+static int
+lbox_fio_popen(struct lua_State *L)
+{
+ if (lua_gettop(L) < 1) {
+ usage:
+ luaL_error(L, "Usage: fio.popen(command, type)");
+ }
+
+ /*
+ * FILE *popen(const char *command, const char *type);
+ */
+ const char *command = lua_tostring(L, 1);
+ if (command == NULL)
+ goto usage;
+
+ const char *type = lua_tostring(L, 2);
+ if (type == NULL)
+ type = "r";
+
+ void* fh = coio_popen(command, type);
+ if (fh == NULL) {
+ lua_pushnil(L);
+ lbox_fio_pushsyserror(L);
+ return 2;
+ }
+ lua_pushlightuserdata(L, fh);
+ return 1;
+}
+static int
+lbox_fio_pclose(struct lua_State *L)
+{
+ void* fd = lua_touserdata(L, 1);
+ return lbox_fio_pushbool(L, coio_pclose(fd) == 0);
+}
+
+static int
+lbox_fio_popen_read(struct lua_State *L)
+{
+ void* fh = lua_touserdata(L, 1);
+ uint32_t ctypeid;
+ char *buf = *(char **)luaL_checkcdata(L, 2, &ctypeid);
+ size_t len = lua_tonumber(L, 3);
+
+ if (!len) {
+ lua_pushinteger(L, 0);
+ return 1;
+ }
+
+ int output_number = 0;
+ int res = coio_popen_read(fh, buf, len, &output_number);
+
+ if (res < 0) {
+ lua_pushnil(L);
+ lua_pushinteger(L, 0);
+ lbox_fio_pushsyserror(L);
+ return 3;
+ }
+
+ lua_pushinteger(L, res);
+ lua_pushinteger(L, output_number);
+ return 2;
+}
+
+static int
+lbox_fio_popen_write(struct lua_State *L)
+{
+ void* fh = lua_touserdata(L, 1);
+ const char *buf = lua_tostring(L, 2);
+ uint32_t ctypeid = 0;
+ if (buf == NULL)
+ buf = *(const char **)luaL_checkcdata(L, 2, &ctypeid);
+ size_t len = lua_tonumber(L, 3);
+
+ int res = coio_popen_write(fh, buf, len);
+ if (res < 0) {
+ lua_pushnil(L);
+ lbox_fio_pushsyserror(L);
+ return 2;
+ }
+ lua_pushinteger(L, res);
+ return 1;
+}
void
tarantool_lua_fio_init(struct lua_State *L)
@@ -747,6 +829,10 @@ tarantool_lua_fio_init(struct lua_State *L)
{ "listdir", lbox_fio_listdir },
{ "fstat", lbox_fio_fstat },
{ "copyfile", lbox_fio_copyfile, },
+ { "popen", lbox_fio_popen },
+ { "pclose", lbox_fio_pclose },
+ { "popen_read", lbox_fio_popen_read },
+ { "popen_write", lbox_fio_popen_write },
{ NULL, NULL }
};
luaL_register(L, NULL, internal_methods);
diff --git a/src/lua/fio.lua b/src/lua/fio.lua
index 38664a556..ed3a6c781 100644
--- a/src/lua/fio.lua
+++ b/src/lua/fio.lua
@@ -206,6 +206,86 @@ fio.open = function(path, flags, mode)
return fh
end
+local popen_methods = {}
+
+-- read stdout & stderr of the process started by fio.popen
+-- read() -> str, source
+-- read(buf) -> len, source
+-- read(size) -> str, source
+-- read(buf, size) -> len, source
+-- source contains id of the stream,
+-- 1 - for stdout, 2 - for stderr, 0 - in a case of error
+popen_methods.read = function(self, buf, size)
+ local tmpbuf
+ if (not ffi.istype(const_char_ptr_t, buf) and buf == nil) or
+ (ffi.istype(const_char_ptr_t, buf) and size == nil) then
+ size = 512
+ end
+ if not ffi.istype(const_char_ptr_t, buf) then
+ size = buf or size
+ tmpbuf = buffer.ibuf()
+ buf = tmpbuf:reserve(size)
+ end
+ local res, output_no, err = internal.popen_read(self.fh, buf, size)
+ if res == nil then
+ if tmpbuf ~= nil then
+ tmpbuf:recycle()
+ end
+ return nil, output_no, err
+ end
+ if tmpbuf ~= nil then
+ tmpbuf:alloc(res)
+ res = ffi.string(tmpbuf.rpos, tmpbuf:size())
+ tmpbuf:recycle()
+ end
+ return res, output_no
+end
+
+-- write(str)
+-- write(buf, len)
+popen_methods.write = function(self, data, len)
+ if not ffi.istype(const_char_ptr_t, data) then
+ data = tostring(data)
+ len = #data
+ end
+ local res, err = internal.popen_write(self.fh, data, len)
+ if err ~= nil then
+ return false, err
+ end
+ return res >= 0
+end
+
+popen_methods.close = function(self)
+ local res, err = internal.pclose(self.fh)
+ self.fh = -1
+ if err ~= nil then
+ return false, err
+ end
+ return res
+end
+
+
+local popen_mt = { __index = popen_methods }
+
+fio.popen = function(command, mode)
+ if type(command) ~= 'string' then
+ error("Usage: fio.popen(command, mode)")
+ end
+
+ if type(mode) ~= 'string' then
+ mode = 'r' -- use default read-mode
+ end
+
+ local fh,err = internal.popen(tostring(command), tostring(mode))
+ if err ~= nil then
+ return nil, err
+ end
+
+ fh = {fh = fh}
+ setmetatable(fh, popen_mt)
+ return fh
+end
+
fio.pathjoin = function(...)
local i, path = 1, nil
diff --git a/test/box-tap/fio_popen.test.lua b/test/box-tap/fio_popen.test.lua
new file mode 100755
index 000000000..6f984b51b
--- /dev/null
+++ b/test/box-tap/fio_popen.test.lua
@@ -0,0 +1,79 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local fio = require('fio')
+local test = tap.test()
+
+test:plan(2 * 3)
+
+box.cfg{}
+
+local function start_app(script_path)
+ local fh = fio.popen(script_path, "r")
+
+ if fh == nil then
+ local err = errno.strerror()
+ error(string.format("Failed to run app: %s, error: %s", script_path, err))
+ return nil
+ else
+ return fh
+ end
+end
+
+function read_stdout(fh)
+ local ss = ""
+
+ local s,src = fh:read(64)
+
+ while s ~= nil and s ~= "" do
+ if src == nil then
+ src = -1
+ print("the source is undefined")
+ else
+ ss = ss .. string.format("[%d] %s", src, s)
+ end
+
+ s,src = fh:read(64)
+ end
+
+ return ss
+end
+
+local build_path = os.getenv("BUILDDIR")
+local app_path = fio.pathjoin(build_path, 'test/box-tap/fio_popen_test1.sh')
+
+local the_app = start_app(app_path)
+test:ok(the_app ~= nil, "starting a child process 1")
+
+local app_output = read_stdout(the_app)
+test:ok(app_output ~= nil, "response from child process 1")
+
+local expected_output = "[1] 1\n[1] 2\n[1] 3\n[1] 4\n[1] 5\n[1] 6\n[1] 7\n[1] 8\n[1] 9\n[1] 10\n"
+
+print("Expected:\n" .. expected_output)
+print("\nReceived:\n" .. app_output)
+
+test:ok(app_output == expected_output, "compare response 1")
+
+the_app:close()
+
+-- Try to get STDERR output
+the_app = start_app("/fufel/fufel.sh")
+test:ok(the_app ~= nil, "starting a not existing process")
+
+app_output = read_stdout(the_app)
+test:ok(app_output ~= nil, "response from child process 2")
+
+local expected_output = "[2] sh: 1: /fufel/fufel.sh: not found\n"
+
+print("Expected:\n" .. expected_output)
+print("\nReceived:\n" .. app_output)
+
+test:ok(app_output == expected_output, "compare response 2")
+
+the_app:close()
+
+
+test:check()
+os.exit(0)
+
diff --git a/test/box-tap/fio_popen_test1.sh b/test/box-tap/fio_popen_test1.sh
new file mode 100755
index 000000000..becf83e6d
--- /dev/null
+++ b/test/box-tap/fio_popen_test1.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+for i in {1..10}
+do
+ echo $i
+ sleep 1
+done
+
diff --git a/third_party/libeio/eio.c b/third_party/libeio/eio.c
index 7351d5dda..e433b1b3b 100644
--- a/third_party/libeio/eio.c
+++ b/third_party/libeio/eio.c
@@ -1741,7 +1741,24 @@ static int eio_threads;
static void ecb_cold
eio_prefork()
{
- eio_threads = etp_set_max_parallel(EIO_POOL, 0);
+ /*
+ * When fork() is called libeio shuts
+ * down all working threads.
+ * But it causes a deadlock if fork() was
+ * called from the libeio thread.
+ * To avoid this do not close the
+ * thread who called fork().
+ * This behaviour is acceptable for the
+ * case when fork() is immediately followed
+ * by exec().
+ * To clone a process call fork() from the
+ * main thread.
+ */
+
+ if (etp_is_in_pool_thread())
+ eio_threads = etp_get_max_parallel(EIO_POOL);
+ else
+ eio_threads = etp_set_max_parallel(EIO_POOL, 0);
}
static void ecb_cold
diff --git a/third_party/libeio/etp.c b/third_party/libeio/etp.c
index 42f7661eb..38c2d1f9b 100644
--- a/third_party/libeio/etp.c
+++ b/third_party/libeio/etp.c
@@ -164,6 +164,21 @@ struct etp_pool_user
xmutex_t lock;
};
+static __thread int is_eio_thread = 0;
+
+/**
+ * Check whether the current thread belongs to
+ * libeio thread pool or just a generic thread.
+ *
+ * @return 0 for generic thread
+ * @return 1 for libeio thread pool
+ **/
+ETP_API_DECL int ecb_cold
+etp_is_in_pool_thread()
+{
+ return is_eio_thread;
+}
+
/* worker threads management */
static void ecb_cold
@@ -322,6 +337,9 @@ X_THREAD_PROC (etp_proc)
self.pool = pool;
etp_pool_user user; /* per request */
+/* Distinguish libeio threads from the generic threads */
+ is_eio_thread = 1;
+
etp_proc_init ();
/* try to distribute timeouts somewhat evenly (nanosecond part) */
@@ -616,3 +634,13 @@ etp_set_max_parallel (etp_pool pool, unsigned int threads)
X_UNLOCK (pool->lock);
return retval;
}
+
+ETP_API_DECL int ecb_cold
+etp_get_max_parallel (etp_pool pool)
+{
+ int retval;
+ X_LOCK (pool->lock);
+ retval = pool->wanted;
+ X_UNLOCK (pool->lock);
+ return retval;
+}
diff --git a/third_party/libev/ev.c b/third_party/libev/ev.c
index 6a2648591..5fa8293a1 100644
--- a/third_party/libev/ev.c
+++ b/third_party/libev/ev.c
@@ -4214,6 +4214,8 @@ noinline
void
ev_signal_stop (EV_P_ ev_signal *w) EV_THROW
{
+ if (!loop)
+ return;
clear_pending (EV_A_ (W)w);
if (expect_false (!ev_is_active (w)))
return;
--
2.17.1
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [tarantool-patches] [PATCH] core: Non-blocking io.popen
2019-04-24 17:00 [tarantool-patches] [PATCH] core: Non-blocking io.popen Stanislav Zudin
@ 2019-05-07 8:49 ` Vladimir Davydov
2019-05-07 10:31 ` Vladimir Davydov
0 siblings, 1 reply; 3+ messages in thread
From: Vladimir Davydov @ 2019-05-07 8:49 UTC (permalink / raw)
To: Stanislav Zudin; +Cc: tarantool-patches, Alexander Turenko
[Cc += Alexander re popen API]
On Wed, Apr 24, 2019 at 08:00:32PM +0300, Stanislav Zudin wrote:
> Adds nonblocking implementation of popen.
> The method is available in namespace fio.
> fio.popen() returns an object with three methods:
> read(), write() and close().
> Method read() reads both STDOUT and STDERR output.
>
> Closes #4031
Please write a DocBot request and be more thorough when describing the
new API: it should be clear what all the functions do and what arguments
they expect; there should be some examples. Currently it's unclear how
to use the new module judging by the description.
Anyway, the API doesn't look good enough to me:
1. Reading both stdout and stderr with the same method doesn't make any
sense. Those are two separate streams; each of them should have a
separate file handle.
2. There must be a way to ignore any of stdin/stdout/stderr. E.g. the
caller might not be interested in stdout. However, if he doesn't
read it, the program will just block once the pipe buffer has been
filled. This is very inconvenient. I guess, we should use parent's
stdout/stderr by default, but there also must be a way to completely
silence the child's output (/dev/null).
3. Is it really necessary to introduce separate read/write methods for
popen? Can't we reuse existing fio.read/write? After all, a pipe can
be used (almost) just like a normal file so why not simply wrap it
in fio file handle?
4. There must be a way to forcefully terminate a child program (kill).
5. The caller should be able to obtain the exit code of a terminated
child - without it it'd be impossible to figure out whether the
program succeeded or failed.
I guess we have to go back to the drawing board and try to devise a good
API before proceeding to implementation. It may be worth to take a look
at other languages' versions of popen, e.g. Python's. Please try to come
up a good API and send out an RFC so that others can look at it and
comment.
Also, the test is insufficient: you should test all the functions
you introduce, all possible use cases. E.g. you don't check that
write(stderr) or read(stdin) works AFAICS.
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [tarantool-patches] [PATCH] core: Non-blocking io.popen
2019-05-07 8:49 ` Vladimir Davydov
@ 2019-05-07 10:31 ` Vladimir Davydov
0 siblings, 0 replies; 3+ messages in thread
From: Vladimir Davydov @ 2019-05-07 10:31 UTC (permalink / raw)
To: Stanislav Zudin; +Cc: tarantool-patches, Alexander Turenko, Georgy Kirichenko
[Cc += Georgy re popen API]
On Tue, May 07, 2019 at 11:49:49AM +0300, Vladimir Davydov wrote:
> [Cc += Alexander re popen API]
>
> On Wed, Apr 24, 2019 at 08:00:32PM +0300, Stanislav Zudin wrote:
> > Adds nonblocking implementation of popen.
> > The method is available in namespace fio.
> > fio.popen() returns an object with three methods:
> > read(), write() and close().
> > Method read() reads both STDOUT and STDERR output.
> >
> > Closes #4031
>
> Please write a DocBot request and be more thorough when describing the
> new API: it should be clear what all the functions do and what arguments
> they expect; there should be some examples. Currently it's unclear how
> to use the new module judging by the description.
>
> Anyway, the API doesn't look good enough to me:
We've discussed the proposed API with Georgy and Alexander. Georgy has
convinced us that it'd be best if read() returned the output of both
stdout/stderr with the second value indicating where the output came
from, because this is the most flexible variant: one can read both
streams together or separately thanks to multireturn. So points 1 and 3
below aren't valid
However, we still want to redirect any of stdin/stderr/stdout to
/dev/null or any arbitrary file descriptor, forcefully terminate the
process, and wait for it to exit. We see it now that popen() should
return a special kind of object, which has read(), write(), close(),
kill(), and wait() methods. I guess all blocking methods should have
optional timeouts. Probably, we need to be able to close stdin, stdout,
stderr separately (close should have an argument?), and wait() should
return the exit code on success. Also, it'd be great if one could
redirect the output of one popen to another popen, like demonstrated
here:
https://www.python.org/dev/peps/pep-0324/#replacing-shell-pipe-line
Please prepare a mock-up of the API taking into account these
comments and send it for review to server-dev@: we need to agree on it
with the Solutions team.
I only skimmed through the code, haven't looked through it thoroughly
yet, and so only have a few minor comments:
- Please don't use -2 as return code. Looks ugly. Use errno or pass
this info in an extra argument.
- I'm not quite sure it's worth moving fork() out of the tx thread.
So let's factor it out into a separate patch: first implement popen
with fork in tx, then move fork out of tx. Also, please consider
getting rid of atfork handlers - they look legacy.
- We need to close all file descriptors in a child process after fork.
Probably, we should use O_CLOSEXEC for all file descriptors. Please
do it in a separate patch.
>
> 1. Reading both stdout and stderr with the same method doesn't make any
> sense. Those are two separate streams; each of them should have a
> separate file handle.
> 2. There must be a way to ignore any of stdin/stdout/stderr. E.g. the
> caller might not be interested in stdout. However, if he doesn't
> read it, the program will just block once the pipe buffer has been
> filled. This is very inconvenient. I guess, we should use parent's
> stdout/stderr by default, but there also must be a way to completely
> silence the child's output (/dev/null).
> 3. Is it really necessary to introduce separate read/write methods for
> popen? Can't we reuse existing fio.read/write? After all, a pipe can
> be used (almost) just like a normal file so why not simply wrap it
> in fio file handle?
> 4. There must be a way to forcefully terminate a child program (kill).
> 5. The caller should be able to obtain the exit code of a terminated
> child - without it it'd be impossible to figure out whether the
> program succeeded or failed.
>
> I guess we have to go back to the drawing board and try to devise a good
> API before proceeding to implementation. It may be worth to take a look
> at other languages' versions of popen, e.g. Python's. Please try to come
> up a good API and send out an RFC so that others can look at it and
> comment.
>
> Also, the test is insufficient: you should test all the functions
> you introduce, all possible use cases. E.g. you don't check that
> write(stderr) or read(stdin) works AFAICS.
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2019-05-07 10:31 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-04-24 17:00 [tarantool-patches] [PATCH] core: Non-blocking io.popen Stanislav Zudin
2019-05-07 8:49 ` Vladimir Davydov
2019-05-07 10:31 ` Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox