[tarantool-patches] [PATCH] core: Non-blocking io.popen
Stanislav Zudin
szudin at tarantool.org
Wed Apr 24 20:00:32 MSK 2019
Adds nonblocking implementation of popen.
The method is available in namespace fio.
fio.popen() returns an object with three methods:
read(), write() and close().
Method read() reads both STDOUT and STDERR output.
Closes #4031
---
Branch: https://github.com/tarantool/tarantool/tree/stanztt/gh-4031-nonblocking-popen
Issue: https://github.com/tarantool/tarantool/issues/4031
src/lib/core/CMakeLists.txt | 1 +
src/lib/core/coio_file.c | 189 +++++++++++++++++
src/lib/core/coio_file.h | 13 ++
src/lib/core/coio_popen.c | 366 ++++++++++++++++++++++++++++++++
src/lib/core/coio_popen.h | 123 +++++++++++
src/lib/core/fiber.h | 4 +-
src/lua/fio.c | 86 ++++++++
src/lua/fio.lua | 80 +++++++
test/box-tap/fio_popen.test.lua | 79 +++++++
test/box-tap/fio_popen_test1.sh | 7 +
third_party/libeio/eio.c | 19 +-
third_party/libeio/etp.c | 28 +++
third_party/libev/ev.c | 2 +
13 files changed, 994 insertions(+), 3 deletions(-)
create mode 100644 src/lib/core/coio_popen.c
create mode 100644 src/lib/core/coio_popen.h
create mode 100755 test/box-tap/fio_popen.test.lua
create mode 100755 test/box-tap/fio_popen_test1.sh
diff --git a/src/lib/core/CMakeLists.txt b/src/lib/core/CMakeLists.txt
index eb10b11c3..8b1f8d32e 100644
--- a/src/lib/core/CMakeLists.txt
+++ b/src/lib/core/CMakeLists.txt
@@ -26,6 +26,7 @@ set(core_sources
trigger.cc
mpstream.c
port.c
+ coio_popen.c
)
if (TARGET_OS_NETBSD)
diff --git a/src/lib/core/coio_file.c b/src/lib/core/coio_file.c
index c5b2db781..ffe9d634c 100644
--- a/src/lib/core/coio_file.c
+++ b/src/lib/core/coio_file.c
@@ -33,6 +33,7 @@
#include "fiber.h"
#include "say.h"
#include "fio.h"
+#include "coio_popen.h"
#include <stdio.h>
#include <stdlib.h>
#include <dirent.h>
@@ -102,6 +103,31 @@ struct coio_file_task {
const char *source;
const char *dest;
} copyfile;
+
+ struct {
+ const char *command;
+ const char *type;
+ void *handle;
+ } popen_params;
+
+ struct {
+ void *handle;
+ } pclose_params;
+
+ struct {
+ void *handle;
+ const void* buf;
+ size_t count;
+ size_t *written;
+ } popen_write;
+
+ struct {
+ void *handle;
+ void *buf;
+ size_t count;
+ size_t *read_bytes;
+ int *output_number;
+ } popen_read;
};
};
@@ -628,3 +654,166 @@ coio_copyfile(const char *source, const char *dest)
eio_req *req = eio_custom(coio_do_copyfile, 0, coio_complete, &eio);
return coio_wait_done(req, &eio);
}
+
+static void
+coio_do_popen(eio_req *req)
+{
+ struct coio_file_task *eio = (struct coio_file_task *)req->data;
+ eio->popen_params.handle = coio_popen_impl(eio->popen_params.command,
+ eio->popen_params.type);
+ eio->result = 0;
+ eio->errorno = errno;
+}
+
+void *
+coio_popen(const char *command, const char *type)
+{
+ INIT_COEIO_FILE(eio);
+ eio.popen_params.command = command;
+ eio.popen_params.type = type;
+
+ eio_req *req = eio_custom(coio_do_popen, 0,
+ coio_complete, &eio);
+ coio_wait_done(req, &eio);
+ return eio.popen_params.handle;
+}
+
+static void
+coio_do_pclose(eio_req *req)
+{
+ struct coio_file_task *eio = (struct coio_file_task *)req->data;
+ req->result = coio_try_pclose_impl(eio->pclose_params.handle);
+ req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_pclose(void *fh)
+{
+ INIT_COEIO_FILE(eio);
+ eio.pclose_params.handle = fh;
+ eio_req *req = eio_custom(coio_do_pclose, 0,
+ coio_complete, &eio);
+ return coio_wait_done(req, &eio);
+}
+
+int
+coio_pclose(void *fh)
+{
+ int rc = coio_try_pclose_impl(fh);
+ if (rc == 0) /* The child process is dead */
+ return 0;
+ else if (rc == -1) /* Failed */
+ return -1;
+
+ assert(rc == -2); /* A blocking operation is expected */
+
+ do {
+ rc = coio_do_nonblock_pclose(fh);
+ } while (rc == -2);
+
+ return rc;
+}
+
+static void
+coio_do_popen_read(eio_req *req)
+{
+ struct coio_file_task *eio = (struct coio_file_task *)req->data;
+
+ int rc = coio_popen_try_to_read(eio->popen_read.handle,
+ eio->popen_read.buf,
+ eio->popen_read.count,
+ eio->popen_read.read_bytes,
+ eio->popen_read.output_number);
+
+ req->result = rc;
+ req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_popen_read(void *fh, void *buf, size_t count,
+ size_t *read_bytes, int *source_id)
+{
+ INIT_COEIO_FILE(eio);
+ eio.popen_read.buf = buf;
+ eio.popen_read.count = count;
+ eio.popen_read.handle = fh;
+ eio.popen_read.read_bytes = read_bytes;
+ eio.popen_read.output_number = source_id;
+ eio_req *req = eio_custom(coio_do_popen_read, 0,
+ coio_complete, &eio);
+ return coio_wait_done(req, &eio);
+}
+
+ssize_t
+coio_popen_read(void *fh, void *buf, size_t count, int *output_number)
+{
+ size_t received = 0;
+ int rc = coio_popen_try_to_read(fh, buf, count,
+ &received, output_number);
+ if (rc == 0) /* The reading's succeeded */
+ return (ssize_t)received;
+ else if (rc == -1) /* Failed */
+ return -1;
+
+ assert(rc == -2); /* A blocking operation is expected */
+
+ do {
+ rc = coio_do_nonblock_popen_read(fh, buf, count,
+ &received, output_number);
+ } while (rc == -2);
+
+ return (rc == 0) ? (ssize_t)received
+ : -1;
+}
+
+static void
+coio_do_popen_write(eio_req *req)
+{
+ struct coio_file_task *eio = (struct coio_file_task *)req->data;
+
+ int rc = coio_popen_try_to_write(eio->popen_write.handle,
+ eio->popen_write.buf,
+ eio->popen_write.count,
+ eio->popen_write.written);
+
+ req->result = rc;
+ req->errorno = errno;
+}
+
+static int
+coio_do_nonblock_popen_write(void *fh, const void *buf, size_t count,
+ size_t *written)
+{
+ INIT_COEIO_FILE(eio);
+ eio.popen_write.buf = buf;
+ eio.popen_write.count = count;
+ eio.popen_write.handle = fh;
+ eio.popen_write.written = written;
+ eio_req *req = eio_custom(coio_do_popen_write, 0,
+ coio_complete, &eio);
+ return coio_wait_done(req, &eio);
+}
+
+ssize_t
+coio_popen_write(void *fh, const void *buf, size_t count)
+{
+ size_t written = 0;
+ int rc = coio_popen_try_to_write(fh, buf, count,
+ &written);
+ if (rc == 0) /* The writing's succeeded */
+ return (ssize_t)written;
+ else if (rc == -1) /* Failed */
+ return -1;
+
+ assert(rc == -2); /* A blocking operation is expected */
+
+ do {
+ buf += written; /* advance writing position */
+ count -= written;
+ rc = coio_do_nonblock_popen_write(fh, buf, count,
+ &written);
+ } while (rc == -2);
+
+ return (rc == 0) ? (ssize_t)written
+ : -1;
+}
diff --git a/src/lib/core/coio_file.h b/src/lib/core/coio_file.h
index f2112ceed..4089c1a05 100644
--- a/src/lib/core/coio_file.h
+++ b/src/lib/core/coio_file.h
@@ -84,6 +84,19 @@ int coio_tempdir(char *path, size_t path_len);
int coio_readdir(const char *path, char **buf);
int coio_copyfile(const char *source, const char *dest);
+
+void *
+coio_popen(const char *command, const char *type);
+
+int
+coio_pclose(void *fd);
+
+ssize_t
+coio_popen_read(void *fh, void *buf, size_t count, int *output_number);
+
+ssize_t
+coio_popen_write(void *fh, const void *buf, size_t count);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/lib/core/coio_popen.c b/src/lib/core/coio_popen.c
new file mode 100644
index 000000000..57b51b6c5
--- /dev/null
+++ b/src/lib/core/coio_popen.c
@@ -0,0 +1,366 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stddef.h>
+#include "coio_popen.h"
+#include "coio_task.h"
+#include "fiber.h"
+#include "say.h"
+#include "fio.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <dirent.h>
+#include <wait.h>
+#include <paths.h>
+#include <unistd.h>
+#include <sys/types.h> /* See NOTES */
+#include <sys/socket.h>
+
+struct popen_data {
+ /* process id */
+ pid_t pid;
+ int fh[3];
+ /*
+ * Three handles:
+ * [0] write to stdin of the child process
+ * [1] read from stdout of the child process
+ * [2] read from stderr of the child process
+ */
+
+ /* The ID of socket was read recently
+ * (STDERR_FILENO or STDOUT_FILENO */
+ int prev_source;
+};
+
+/*
+ * Returns next socket to read.
+ * Use this function when both STDOUT and STDERR outputs
+ * are ready for reading.
+ * */
+static inline int
+get_handle_in_order(struct popen_data *data)
+{
+ /*
+ * Invert the order of handles to be read
+ */
+ const int mask = STDERR_FILENO | STDOUT_FILENO;
+ data->prev_source ^= mask;
+
+ /*
+ * If handle is not available, invert it back
+ */
+ if (data->fh[data->prev_source] < 0)
+ data->prev_source ^= mask;
+ /*
+ * if both reading handles are invalid return -1
+ */
+ return data->fh[data->prev_source];
+}
+
+static struct popen_data *
+popen_data_new()
+{
+ struct popen_data *data =
+ (struct popen_data *)calloc(1, sizeof(*data));
+ data->fh[0] = -1;
+ data->fh[1] = -1;
+ data->fh[2] = -1;
+ data->prev_source = STDERR_FILENO;
+ /*
+ * if both streams are ready then
+ * start reading from STDOUT
+ */
+ return data;
+}
+
+void *
+coio_popen_impl(const char *command, const char *type)
+{
+ pid_t pid;
+ int socket_rw[2] = {-1,-1};
+ int socket_err[2] = {-1,-1};
+ errno = 0;
+
+ char *argv[] = {"sh", "-c", NULL, NULL};
+ argv[2] = (char *)command;
+
+ if ((*type != 'r' && *type != 'w') || type[1] != '\0') {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ struct popen_data *data = popen_data_new();
+ if (data == NULL)
+ return NULL;
+
+ const int sock_type = SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC;
+ if (socketpair(AF_UNIX, sock_type, 0, socket_rw) < 0) {
+ goto on_error;
+ }
+ if (socketpair(AF_UNIX, sock_type, 0, socket_err) < 0) {
+ goto on_error;
+ }
+
+ pid = fork();
+
+ if (pid < 0)
+ goto on_error;
+ else if (pid == 0) /* child */ {
+ /* Setup stdin/stdout */
+ close(socket_rw[0]);
+ int fno = (*type == 'r') ? STDOUT_FILENO
+ : STDIN_FILENO;
+ if (socket_rw[1] != fno) {
+ dup2(socket_rw[1], fno);
+ close(socket_rw[1]);
+ }
+
+ /* setup stderr */
+ close(socket_err[0]);
+ if (socket_err[1] != STDERR_FILENO) {
+ dup2(socket_err[1], STDERR_FILENO);
+ close(socket_err[1]);
+ }
+
+ execve(_PATH_BSHELL, argv, environ);
+ _exit(127);
+ unreachable();
+ }
+
+ /* parent process */
+ close(socket_rw[1]);
+ close(socket_err[1]);
+
+ if (*type == 'r')
+ data->fh[STDOUT_FILENO] = socket_rw[0];
+ else
+ data->fh[STDIN_FILENO] = socket_rw[0];
+
+ data->fh[STDERR_FILENO] = socket_err[0];
+ data->pid = pid;
+
+ return data;
+
+on_error:
+ if (data)
+ free(data);
+ if (socket_rw[0] >= 0) {
+ close(socket_rw[0]);
+ close(socket_rw[1]);
+ }
+ if (socket_err[0] >= 0) {
+ close(socket_err[0]);
+ close(socket_err[1]);
+ }
+ return NULL;
+}
+
+int
+coio_try_pclose_impl(void *fh)
+{
+ struct popen_data *data = (struct popen_data *)fh;
+
+ if (data == NULL){
+ errno = EBADF;
+ return -1;
+ }
+
+ /* Close all handles */
+ for(int i = 0; i < 3; ++i) {
+ if (data->fh[i] >= 0) {
+ close(data->fh[i]);
+ data->fh[i] = -1;
+ }
+ }
+
+ int pstat;
+ pid_t pid = waitpid(data->pid, &pstat, WNOHANG);
+
+ int rc = 0;
+
+ if (pid == 0)
+ return -2; /* Process is still running */
+ else if (pid < 0) {
+ if (errno == ECHILD)
+ rc = 0; /* Child process is not found
+ * (may be is already dead)
+ */
+ else if (errno == EINTR)
+ return -2; /* Retry */
+ else
+ rc = -1; /* An error occurred */
+ }
+
+ free(data);
+ return rc;
+}
+
+int
+coio_popen_try_to_read(void *fh, void *buf, size_t count,
+ size_t *read_bytes, int *source_id)
+{
+ struct popen_data *data = (struct popen_data *)fh;
+
+ if (data == NULL){
+ errno = EBADF;
+ return -1;
+ }
+
+ fd_set rfds;
+ FD_ZERO(&rfds);
+ ssize_t received = 0;
+ int num = 0;
+
+ if (data->fh[STDOUT_FILENO] >= 0) {
+ FD_SET(data->fh[STDOUT_FILENO], &rfds);
+ ++num;
+ }
+ if (data->fh[STDERR_FILENO] >= 0) {
+ FD_SET(data->fh[STDERR_FILENO], &rfds);
+ ++num;
+ }
+
+ if (num == 0) {
+ /*
+ * There are no open handles for reading
+ */
+ errno = EBADF;
+ return -1;
+ }
+
+ struct timeval tv = {0,0};
+ int max_h = MAX(data->fh[STDOUT_FILENO],
+ data->fh[STDERR_FILENO]);
+
+ errno = 0;
+ int retv = select(max_h + 1, &rfds, NULL, NULL, &tv);
+ switch (retv) {
+ case -1: /* Error */
+ return -1;
+ case 0: /* Not ready yet */
+ return -2;
+ case 1: { /* One socket is ready */
+
+ /* Choose the socket */
+ int fno = STDOUT_FILENO;
+ if (!FD_ISSET(data->fh[fno], &rfds))
+ fno = STDERR_FILENO;
+ if (!FD_ISSET(data->fh[fno], &rfds)) {
+ unreachable();
+ return -1;
+ }
+
+ received = read(data->fh[fno], buf, count);
+ if (received < 0)
+ goto on_error;
+ data->prev_source = fno;
+ *read_bytes = received;
+ *source_id = fno;
+ return 0;
+ }
+ case 2: { /* Both sockets are ready */
+ received = read(get_handle_in_order(data), buf, count);
+ if (received < 0)
+ goto on_error;
+ *read_bytes = received;
+ *source_id = (int)data->prev_source;
+ return 0;
+ }
+ }
+
+ unreachable();
+ return -1;
+
+on_error:
+ if (errno == EINTR) {
+ *read_bytes = 0;
+ return -2; /* Repeat */
+ } else
+ return -1; /* Error */
+}
+
+int
+coio_popen_try_to_write(void *fh, const void *buf, size_t count,
+ size_t *written)
+{
+ if (count == 0)
+ return 0;
+
+ struct popen_data *data = (struct popen_data *)fh;
+
+ if (data == NULL){
+ errno = EBADF;
+ return -1;
+ }
+
+ if (data->fh[STDIN_FILENO] < 0) {
+ /*
+ * There are no open handles for writing
+ */
+ errno = EBADF;
+ return -1;
+ }
+
+ fd_set wfds;
+ FD_ZERO(&wfds);
+
+ int wh = data->fh[STDIN_FILENO];
+ FD_SET(wh, &wfds);
+
+ struct timeval tv = {0,0};
+
+ int retv = select(wh + 1, NULL, &wfds, NULL, &tv);
+ if (retv < 0)
+ goto on_error;
+ else if (retv == 0)
+ return -2; /* Not ready yet */
+
+ assert(retv == 1); /* The socket is ready */
+
+ if (FD_ISSET(wh, &wfds)) {
+ ssize_t rc = write(wh, buf, count);
+ if (rc < 0)
+ goto on_error;
+ *written = rc;
+ return (*written == count) ? 0
+ : -2;
+ }
+
+ unreachable();
+ return -1;
+
+on_error:
+ if (errno == EINTR) {
+ *written = 0;
+ return -2; /* Repeat */
+ } else
+ return -1; /* Error */
+}
\ No newline at end of file
diff --git a/src/lib/core/coio_popen.h b/src/lib/core/coio_popen.h
new file mode 100644
index 000000000..732c2b1f4
--- /dev/null
+++ b/src/lib/core/coio_popen.h
@@ -0,0 +1,123 @@
+#ifndef TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED
+#define TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+ * Implementation of fio.popen.
+ * The function opens a process by creating a pipe,
+ * forking, and invoking the shell.
+ *
+ * @param command pointer to a null-terminated string
+ * containing a shell command line.
+ * This command is passed to /bin/sh using the -c flag;
+ *
+ * @param type pointer to a null-terminated string
+ * which must contain either the letter 'r' for reading
+ * or the letter 'w' for writing.
+ *
+ * @return handle of the pipe for reading or writing
+ * (depends on value of type).
+ * In a case of error returns NULL.
+ */
+void *
+coio_popen_impl(const char *command, const char *type);
+
+/**
+ * Implementation of fio.pclose.
+ * The function tries to retrieve status of
+ * the associated process.
+ * If the associated process is terminated then releases
+ * allocated resources.
+ * If the associated process is still running the function
+ * returns immediately. In this case repeat the call.
+ *
+ * @param fh handle returned by fio.popen.
+ *
+ * @return 0 if the process is terminated
+ * @return -1 for an error
+ * @return -2 if the process is still running
+ */
+int
+coio_try_pclose_impl(void *fh);
+
+/**
+ * The function reads up to count bytes from the handle
+ * associated with the child process.
+ * Returns immediately
+ *
+ * @param fd handle returned by fio.popen.
+ * @param buf a buffer to be read into
+ * @param count size of buffer in bytes
+ * @param read_bytes A pointer to the
+ * variable that receives the number of bytes read.
+ * @param source_id A pointer to the variable that receives a
+ * source stream id, 1 - for STDOUT, 2 - for STDERR.
+ *
+ * @return 0 data were successfully read
+ * @return -1 an error occurred, see errno for error code
+ * @return -2 there is nothing to read yet
+ */
+int
+coio_popen_try_to_read(void *fh, void *buf, size_t count,
+ size_t *read_bytes, int *source_id);
+
+/**
+ * The function writes up to count bytes to the handle
+ * associated with the child process.
+ * Tries to write as much as possible without blocking
+ * and immediately returns.
+ *
+ * @param fd handle returned by fio.popen.
+ * @param buf a buffer to be written from
+ * @param count size of buffer in bytes
+ * @param written A pointer to the
+ * variable that receives the number of bytes actually written.
+ * If function fails the number of written bytes is undefined.
+ *
+ * @return 0 all data were successfully written
+ * @return -1 an error occurred, see errno for error code
+ * @return -2 the writing can block
+ */
+int
+coio_popen_try_to_write(void *fh, const void *buf, size_t count,
+ size_t *written);
+
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED */
diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
index 89fb04284..79f830555 100644
--- a/src/lib/core/fiber.h
+++ b/src/lib/core/fiber.h
@@ -488,8 +488,8 @@ struct cord {
extern __thread struct cord *cord_ptr;
#define cord() cord_ptr
-#define fiber() cord()->fiber
-#define loop() (cord()->loop)
+#define fiber() (cord() ? cord()->fiber : NULL)
+#define loop() (cord() ? cord()->loop : NULL)
void
cord_create(struct cord *cord, const char *name);
diff --git a/src/lua/fio.c b/src/lua/fio.c
index 806f4256b..65481edc0 100644
--- a/src/lua/fio.c
+++ b/src/lua/fio.c
@@ -703,7 +703,89 @@ lbox_fio_copyfile(struct lua_State *L)
return lbox_fio_pushbool(L, coio_copyfile(source, dest) == 0);
}
+static int
+lbox_fio_popen(struct lua_State *L)
+{
+ if (lua_gettop(L) < 1) {
+ usage:
+ luaL_error(L, "Usage: fio.popen(command, type)");
+ }
+
+ /*
+ * FILE *popen(const char *command, const char *type);
+ */
+ const char *command = lua_tostring(L, 1);
+ if (command == NULL)
+ goto usage;
+
+ const char *type = lua_tostring(L, 2);
+ if (type == NULL)
+ type = "r";
+
+ void* fh = coio_popen(command, type);
+ if (fh == NULL) {
+ lua_pushnil(L);
+ lbox_fio_pushsyserror(L);
+ return 2;
+ }
+ lua_pushlightuserdata(L, fh);
+ return 1;
+}
+static int
+lbox_fio_pclose(struct lua_State *L)
+{
+ void* fd = lua_touserdata(L, 1);
+ return lbox_fio_pushbool(L, coio_pclose(fd) == 0);
+}
+
+static int
+lbox_fio_popen_read(struct lua_State *L)
+{
+ void* fh = lua_touserdata(L, 1);
+ uint32_t ctypeid;
+ char *buf = *(char **)luaL_checkcdata(L, 2, &ctypeid);
+ size_t len = lua_tonumber(L, 3);
+
+ if (!len) {
+ lua_pushinteger(L, 0);
+ return 1;
+ }
+
+ int output_number = 0;
+ int res = coio_popen_read(fh, buf, len, &output_number);
+
+ if (res < 0) {
+ lua_pushnil(L);
+ lua_pushinteger(L, 0);
+ lbox_fio_pushsyserror(L);
+ return 3;
+ }
+
+ lua_pushinteger(L, res);
+ lua_pushinteger(L, output_number);
+ return 2;
+}
+
+static int
+lbox_fio_popen_write(struct lua_State *L)
+{
+ void* fh = lua_touserdata(L, 1);
+ const char *buf = lua_tostring(L, 2);
+ uint32_t ctypeid = 0;
+ if (buf == NULL)
+ buf = *(const char **)luaL_checkcdata(L, 2, &ctypeid);
+ size_t len = lua_tonumber(L, 3);
+
+ int res = coio_popen_write(fh, buf, len);
+ if (res < 0) {
+ lua_pushnil(L);
+ lbox_fio_pushsyserror(L);
+ return 2;
+ }
+ lua_pushinteger(L, res);
+ return 1;
+}
void
tarantool_lua_fio_init(struct lua_State *L)
@@ -747,6 +829,10 @@ tarantool_lua_fio_init(struct lua_State *L)
{ "listdir", lbox_fio_listdir },
{ "fstat", lbox_fio_fstat },
{ "copyfile", lbox_fio_copyfile, },
+ { "popen", lbox_fio_popen },
+ { "pclose", lbox_fio_pclose },
+ { "popen_read", lbox_fio_popen_read },
+ { "popen_write", lbox_fio_popen_write },
{ NULL, NULL }
};
luaL_register(L, NULL, internal_methods);
diff --git a/src/lua/fio.lua b/src/lua/fio.lua
index 38664a556..ed3a6c781 100644
--- a/src/lua/fio.lua
+++ b/src/lua/fio.lua
@@ -206,6 +206,86 @@ fio.open = function(path, flags, mode)
return fh
end
+local popen_methods = {}
+
+-- read stdout & stderr of the process started by fio.popen
+-- read() -> str, source
+-- read(buf) -> len, source
+-- read(size) -> str, source
+-- read(buf, size) -> len, source
+-- source contains id of the stream,
+-- 1 - for stdout, 2 - for stderr, 0 - in a case of error
+popen_methods.read = function(self, buf, size)
+ local tmpbuf
+ if (not ffi.istype(const_char_ptr_t, buf) and buf == nil) or
+ (ffi.istype(const_char_ptr_t, buf) and size == nil) then
+ size = 512
+ end
+ if not ffi.istype(const_char_ptr_t, buf) then
+ size = buf or size
+ tmpbuf = buffer.ibuf()
+ buf = tmpbuf:reserve(size)
+ end
+ local res, output_no, err = internal.popen_read(self.fh, buf, size)
+ if res == nil then
+ if tmpbuf ~= nil then
+ tmpbuf:recycle()
+ end
+ return nil, output_no, err
+ end
+ if tmpbuf ~= nil then
+ tmpbuf:alloc(res)
+ res = ffi.string(tmpbuf.rpos, tmpbuf:size())
+ tmpbuf:recycle()
+ end
+ return res, output_no
+end
+
+-- write(str)
+-- write(buf, len)
+popen_methods.write = function(self, data, len)
+ if not ffi.istype(const_char_ptr_t, data) then
+ data = tostring(data)
+ len = #data
+ end
+ local res, err = internal.popen_write(self.fh, data, len)
+ if err ~= nil then
+ return false, err
+ end
+ return res >= 0
+end
+
+popen_methods.close = function(self)
+ local res, err = internal.pclose(self.fh)
+ self.fh = -1
+ if err ~= nil then
+ return false, err
+ end
+ return res
+end
+
+
+local popen_mt = { __index = popen_methods }
+
+fio.popen = function(command, mode)
+ if type(command) ~= 'string' then
+ error("Usage: fio.popen(command, mode)")
+ end
+
+ if type(mode) ~= 'string' then
+ mode = 'r' -- use default read-mode
+ end
+
+ local fh,err = internal.popen(tostring(command), tostring(mode))
+ if err ~= nil then
+ return nil, err
+ end
+
+ fh = {fh = fh}
+ setmetatable(fh, popen_mt)
+ return fh
+end
+
fio.pathjoin = function(...)
local i, path = 1, nil
diff --git a/test/box-tap/fio_popen.test.lua b/test/box-tap/fio_popen.test.lua
new file mode 100755
index 000000000..6f984b51b
--- /dev/null
+++ b/test/box-tap/fio_popen.test.lua
@@ -0,0 +1,79 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local fio = require('fio')
+local test = tap.test()
+
+test:plan(2 * 3)
+
+box.cfg{}
+
+local function start_app(script_path)
+ local fh = fio.popen(script_path, "r")
+
+ if fh == nil then
+ local err = errno.strerror()
+ error(string.format("Failed to run app: %s, error: %s", script_path, err))
+ return nil
+ else
+ return fh
+ end
+end
+
+function read_stdout(fh)
+ local ss = ""
+
+ local s,src = fh:read(64)
+
+ while s ~= nil and s ~= "" do
+ if src == nil then
+ src = -1
+ print("the source is undefined")
+ else
+ ss = ss .. string.format("[%d] %s", src, s)
+ end
+
+ s,src = fh:read(64)
+ end
+
+ return ss
+end
+
+local build_path = os.getenv("BUILDDIR")
+local app_path = fio.pathjoin(build_path, 'test/box-tap/fio_popen_test1.sh')
+
+local the_app = start_app(app_path)
+test:ok(the_app ~= nil, "starting a child process 1")
+
+local app_output = read_stdout(the_app)
+test:ok(app_output ~= nil, "response from child process 1")
+
+local expected_output = "[1] 1\n[1] 2\n[1] 3\n[1] 4\n[1] 5\n[1] 6\n[1] 7\n[1] 8\n[1] 9\n[1] 10\n"
+
+print("Expected:\n" .. expected_output)
+print("\nReceived:\n" .. app_output)
+
+test:ok(app_output == expected_output, "compare response 1")
+
+the_app:close()
+
+-- Try to get STDERR output
+the_app = start_app("/fufel/fufel.sh")
+test:ok(the_app ~= nil, "starting a not existing process")
+
+app_output = read_stdout(the_app)
+test:ok(app_output ~= nil, "response from child process 2")
+
+local expected_output = "[2] sh: 1: /fufel/fufel.sh: not found\n"
+
+print("Expected:\n" .. expected_output)
+print("\nReceived:\n" .. app_output)
+
+test:ok(app_output == expected_output, "compare response 2")
+
+the_app:close()
+
+
+test:check()
+os.exit(0)
+
diff --git a/test/box-tap/fio_popen_test1.sh b/test/box-tap/fio_popen_test1.sh
new file mode 100755
index 000000000..becf83e6d
--- /dev/null
+++ b/test/box-tap/fio_popen_test1.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+for i in {1..10}
+do
+ echo $i
+ sleep 1
+done
+
diff --git a/third_party/libeio/eio.c b/third_party/libeio/eio.c
index 7351d5dda..e433b1b3b 100644
--- a/third_party/libeio/eio.c
+++ b/third_party/libeio/eio.c
@@ -1741,7 +1741,24 @@ static int eio_threads;
static void ecb_cold
eio_prefork()
{
- eio_threads = etp_set_max_parallel(EIO_POOL, 0);
+ /*
+ * When fork() is called libeio shuts
+ * down all working threads.
+ * But it causes a deadlock if fork() was
+ * called from the libeio thread.
+ * To avoid this do not close the
+ * thread who called fork().
+ * This behaviour is acceptable for the
+ * case when fork() is immediately followed
+ * by exec().
+ * To clone a process call fork() from the
+ * main thread.
+ */
+
+ if (etp_is_in_pool_thread())
+ eio_threads = etp_get_max_parallel(EIO_POOL);
+ else
+ eio_threads = etp_set_max_parallel(EIO_POOL, 0);
}
static void ecb_cold
diff --git a/third_party/libeio/etp.c b/third_party/libeio/etp.c
index 42f7661eb..38c2d1f9b 100644
--- a/third_party/libeio/etp.c
+++ b/third_party/libeio/etp.c
@@ -164,6 +164,21 @@ struct etp_pool_user
xmutex_t lock;
};
+static __thread int is_eio_thread = 0;
+
+/**
+ * Check whether the current thread belongs to
+ * libeio thread pool or just a generic thread.
+ *
+ * @return 0 for generic thread
+ * @return 1 for libeio thread pool
+ **/
+ETP_API_DECL int ecb_cold
+etp_is_in_pool_thread()
+{
+ return is_eio_thread;
+}
+
/* worker threads management */
static void ecb_cold
@@ -322,6 +337,9 @@ X_THREAD_PROC (etp_proc)
self.pool = pool;
etp_pool_user user; /* per request */
+/* Distinguish libeio threads from the generic threads */
+ is_eio_thread = 1;
+
etp_proc_init ();
/* try to distribute timeouts somewhat evenly (nanosecond part) */
@@ -616,3 +634,13 @@ etp_set_max_parallel (etp_pool pool, unsigned int threads)
X_UNLOCK (pool->lock);
return retval;
}
+
+ETP_API_DECL int ecb_cold
+etp_get_max_parallel (etp_pool pool)
+{
+ int retval;
+ X_LOCK (pool->lock);
+ retval = pool->wanted;
+ X_UNLOCK (pool->lock);
+ return retval;
+}
diff --git a/third_party/libev/ev.c b/third_party/libev/ev.c
index 6a2648591..5fa8293a1 100644
--- a/third_party/libev/ev.c
+++ b/third_party/libev/ev.c
@@ -4214,6 +4214,8 @@ noinline
void
ev_signal_stop (EV_P_ ev_signal *w) EV_THROW
{
+ if (!loop)
+ return;
clear_pending (EV_A_ (W)w);
if (expect_false (!ev_is_active (w)))
return;
--
2.17.1
More information about the Tarantool-patches
mailing list