From: Stanislav Zudin <szudin@tarantool.org> To: tarantool-patches@freelists.org Cc: Stanislav Zudin <szudin@tarantool.org> Subject: [tarantool-patches] [PATCH] core: Non-blocking io.popen Date: Wed, 24 Apr 2019 20:00:32 +0300 [thread overview] Message-ID: <20190424170032.10726-1-szudin@tarantool.org> (raw) Adds nonblocking implementation of popen. The method is available in namespace fio. fio.popen() returns an object with three methods: read(), write() and close(). Method read() reads both STDOUT and STDERR output. Closes #4031 --- Branch: https://github.com/tarantool/tarantool/tree/stanztt/gh-4031-nonblocking-popen Issue: https://github.com/tarantool/tarantool/issues/4031 src/lib/core/CMakeLists.txt | 1 + src/lib/core/coio_file.c | 189 +++++++++++++++++ src/lib/core/coio_file.h | 13 ++ src/lib/core/coio_popen.c | 366 ++++++++++++++++++++++++++++++++ src/lib/core/coio_popen.h | 123 +++++++++++ src/lib/core/fiber.h | 4 +- src/lua/fio.c | 86 ++++++++ src/lua/fio.lua | 80 +++++++ test/box-tap/fio_popen.test.lua | 79 +++++++ test/box-tap/fio_popen_test1.sh | 7 + third_party/libeio/eio.c | 19 +- third_party/libeio/etp.c | 28 +++ third_party/libev/ev.c | 2 + 13 files changed, 994 insertions(+), 3 deletions(-) create mode 100644 src/lib/core/coio_popen.c create mode 100644 src/lib/core/coio_popen.h create mode 100755 test/box-tap/fio_popen.test.lua create mode 100755 test/box-tap/fio_popen_test1.sh diff --git a/src/lib/core/CMakeLists.txt b/src/lib/core/CMakeLists.txt index eb10b11c3..8b1f8d32e 100644 --- a/src/lib/core/CMakeLists.txt +++ b/src/lib/core/CMakeLists.txt @@ -26,6 +26,7 @@ set(core_sources trigger.cc mpstream.c port.c + coio_popen.c ) if (TARGET_OS_NETBSD) diff --git a/src/lib/core/coio_file.c b/src/lib/core/coio_file.c index c5b2db781..ffe9d634c 100644 --- a/src/lib/core/coio_file.c +++ b/src/lib/core/coio_file.c @@ -33,6 +33,7 @@ #include "fiber.h" #include "say.h" #include "fio.h" +#include "coio_popen.h" #include <stdio.h> #include <stdlib.h> #include <dirent.h> @@ -102,6 +103,31 @@ struct coio_file_task { const char *source; const char *dest; } copyfile; + + struct { + const char *command; + const char *type; + void *handle; + } popen_params; + + struct { + void *handle; + } pclose_params; + + struct { + void *handle; + const void* buf; + size_t count; + size_t *written; + } popen_write; + + struct { + void *handle; + void *buf; + size_t count; + size_t *read_bytes; + int *output_number; + } popen_read; }; }; @@ -628,3 +654,166 @@ coio_copyfile(const char *source, const char *dest) eio_req *req = eio_custom(coio_do_copyfile, 0, coio_complete, &eio); return coio_wait_done(req, &eio); } + +static void +coio_do_popen(eio_req *req) +{ + struct coio_file_task *eio = (struct coio_file_task *)req->data; + eio->popen_params.handle = coio_popen_impl(eio->popen_params.command, + eio->popen_params.type); + eio->result = 0; + eio->errorno = errno; +} + +void * +coio_popen(const char *command, const char *type) +{ + INIT_COEIO_FILE(eio); + eio.popen_params.command = command; + eio.popen_params.type = type; + + eio_req *req = eio_custom(coio_do_popen, 0, + coio_complete, &eio); + coio_wait_done(req, &eio); + return eio.popen_params.handle; +} + +static void +coio_do_pclose(eio_req *req) +{ + struct coio_file_task *eio = (struct coio_file_task *)req->data; + req->result = coio_try_pclose_impl(eio->pclose_params.handle); + req->errorno = errno; +} + +static int +coio_do_nonblock_pclose(void *fh) +{ + INIT_COEIO_FILE(eio); + eio.pclose_params.handle = fh; + eio_req *req = eio_custom(coio_do_pclose, 0, + coio_complete, &eio); + return coio_wait_done(req, &eio); +} + +int +coio_pclose(void *fh) +{ + int rc = coio_try_pclose_impl(fh); + if (rc == 0) /* The child process is dead */ + return 0; + else if (rc == -1) /* Failed */ + return -1; + + assert(rc == -2); /* A blocking operation is expected */ + + do { + rc = coio_do_nonblock_pclose(fh); + } while (rc == -2); + + return rc; +} + +static void +coio_do_popen_read(eio_req *req) +{ + struct coio_file_task *eio = (struct coio_file_task *)req->data; + + int rc = coio_popen_try_to_read(eio->popen_read.handle, + eio->popen_read.buf, + eio->popen_read.count, + eio->popen_read.read_bytes, + eio->popen_read.output_number); + + req->result = rc; + req->errorno = errno; +} + +static int +coio_do_nonblock_popen_read(void *fh, void *buf, size_t count, + size_t *read_bytes, int *source_id) +{ + INIT_COEIO_FILE(eio); + eio.popen_read.buf = buf; + eio.popen_read.count = count; + eio.popen_read.handle = fh; + eio.popen_read.read_bytes = read_bytes; + eio.popen_read.output_number = source_id; + eio_req *req = eio_custom(coio_do_popen_read, 0, + coio_complete, &eio); + return coio_wait_done(req, &eio); +} + +ssize_t +coio_popen_read(void *fh, void *buf, size_t count, int *output_number) +{ + size_t received = 0; + int rc = coio_popen_try_to_read(fh, buf, count, + &received, output_number); + if (rc == 0) /* The reading's succeeded */ + return (ssize_t)received; + else if (rc == -1) /* Failed */ + return -1; + + assert(rc == -2); /* A blocking operation is expected */ + + do { + rc = coio_do_nonblock_popen_read(fh, buf, count, + &received, output_number); + } while (rc == -2); + + return (rc == 0) ? (ssize_t)received + : -1; +} + +static void +coio_do_popen_write(eio_req *req) +{ + struct coio_file_task *eio = (struct coio_file_task *)req->data; + + int rc = coio_popen_try_to_write(eio->popen_write.handle, + eio->popen_write.buf, + eio->popen_write.count, + eio->popen_write.written); + + req->result = rc; + req->errorno = errno; +} + +static int +coio_do_nonblock_popen_write(void *fh, const void *buf, size_t count, + size_t *written) +{ + INIT_COEIO_FILE(eio); + eio.popen_write.buf = buf; + eio.popen_write.count = count; + eio.popen_write.handle = fh; + eio.popen_write.written = written; + eio_req *req = eio_custom(coio_do_popen_write, 0, + coio_complete, &eio); + return coio_wait_done(req, &eio); +} + +ssize_t +coio_popen_write(void *fh, const void *buf, size_t count) +{ + size_t written = 0; + int rc = coio_popen_try_to_write(fh, buf, count, + &written); + if (rc == 0) /* The writing's succeeded */ + return (ssize_t)written; + else if (rc == -1) /* Failed */ + return -1; + + assert(rc == -2); /* A blocking operation is expected */ + + do { + buf += written; /* advance writing position */ + count -= written; + rc = coio_do_nonblock_popen_write(fh, buf, count, + &written); + } while (rc == -2); + + return (rc == 0) ? (ssize_t)written + : -1; +} diff --git a/src/lib/core/coio_file.h b/src/lib/core/coio_file.h index f2112ceed..4089c1a05 100644 --- a/src/lib/core/coio_file.h +++ b/src/lib/core/coio_file.h @@ -84,6 +84,19 @@ int coio_tempdir(char *path, size_t path_len); int coio_readdir(const char *path, char **buf); int coio_copyfile(const char *source, const char *dest); + +void * +coio_popen(const char *command, const char *type); + +int +coio_pclose(void *fd); + +ssize_t +coio_popen_read(void *fh, void *buf, size_t count, int *output_number); + +ssize_t +coio_popen_write(void *fh, const void *buf, size_t count); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/lib/core/coio_popen.c b/src/lib/core/coio_popen.c new file mode 100644 index 000000000..57b51b6c5 --- /dev/null +++ b/src/lib/core/coio_popen.c @@ -0,0 +1,366 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <stddef.h> +#include "coio_popen.h" +#include "coio_task.h" +#include "fiber.h" +#include "say.h" +#include "fio.h" +#include <stdio.h> +#include <stdlib.h> +#include <dirent.h> +#include <wait.h> +#include <paths.h> +#include <unistd.h> +#include <sys/types.h> /* See NOTES */ +#include <sys/socket.h> + +struct popen_data { + /* process id */ + pid_t pid; + int fh[3]; + /* + * Three handles: + * [0] write to stdin of the child process + * [1] read from stdout of the child process + * [2] read from stderr of the child process + */ + + /* The ID of socket was read recently + * (STDERR_FILENO or STDOUT_FILENO */ + int prev_source; +}; + +/* + * Returns next socket to read. + * Use this function when both STDOUT and STDERR outputs + * are ready for reading. + * */ +static inline int +get_handle_in_order(struct popen_data *data) +{ + /* + * Invert the order of handles to be read + */ + const int mask = STDERR_FILENO | STDOUT_FILENO; + data->prev_source ^= mask; + + /* + * If handle is not available, invert it back + */ + if (data->fh[data->prev_source] < 0) + data->prev_source ^= mask; + /* + * if both reading handles are invalid return -1 + */ + return data->fh[data->prev_source]; +} + +static struct popen_data * +popen_data_new() +{ + struct popen_data *data = + (struct popen_data *)calloc(1, sizeof(*data)); + data->fh[0] = -1; + data->fh[1] = -1; + data->fh[2] = -1; + data->prev_source = STDERR_FILENO; + /* + * if both streams are ready then + * start reading from STDOUT + */ + return data; +} + +void * +coio_popen_impl(const char *command, const char *type) +{ + pid_t pid; + int socket_rw[2] = {-1,-1}; + int socket_err[2] = {-1,-1}; + errno = 0; + + char *argv[] = {"sh", "-c", NULL, NULL}; + argv[2] = (char *)command; + + if ((*type != 'r' && *type != 'w') || type[1] != '\0') { + errno = EINVAL; + return NULL; + } + + struct popen_data *data = popen_data_new(); + if (data == NULL) + return NULL; + + const int sock_type = SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC; + if (socketpair(AF_UNIX, sock_type, 0, socket_rw) < 0) { + goto on_error; + } + if (socketpair(AF_UNIX, sock_type, 0, socket_err) < 0) { + goto on_error; + } + + pid = fork(); + + if (pid < 0) + goto on_error; + else if (pid == 0) /* child */ { + /* Setup stdin/stdout */ + close(socket_rw[0]); + int fno = (*type == 'r') ? STDOUT_FILENO + : STDIN_FILENO; + if (socket_rw[1] != fno) { + dup2(socket_rw[1], fno); + close(socket_rw[1]); + } + + /* setup stderr */ + close(socket_err[0]); + if (socket_err[1] != STDERR_FILENO) { + dup2(socket_err[1], STDERR_FILENO); + close(socket_err[1]); + } + + execve(_PATH_BSHELL, argv, environ); + _exit(127); + unreachable(); + } + + /* parent process */ + close(socket_rw[1]); + close(socket_err[1]); + + if (*type == 'r') + data->fh[STDOUT_FILENO] = socket_rw[0]; + else + data->fh[STDIN_FILENO] = socket_rw[0]; + + data->fh[STDERR_FILENO] = socket_err[0]; + data->pid = pid; + + return data; + +on_error: + if (data) + free(data); + if (socket_rw[0] >= 0) { + close(socket_rw[0]); + close(socket_rw[1]); + } + if (socket_err[0] >= 0) { + close(socket_err[0]); + close(socket_err[1]); + } + return NULL; +} + +int +coio_try_pclose_impl(void *fh) +{ + struct popen_data *data = (struct popen_data *)fh; + + if (data == NULL){ + errno = EBADF; + return -1; + } + + /* Close all handles */ + for(int i = 0; i < 3; ++i) { + if (data->fh[i] >= 0) { + close(data->fh[i]); + data->fh[i] = -1; + } + } + + int pstat; + pid_t pid = waitpid(data->pid, &pstat, WNOHANG); + + int rc = 0; + + if (pid == 0) + return -2; /* Process is still running */ + else if (pid < 0) { + if (errno == ECHILD) + rc = 0; /* Child process is not found + * (may be is already dead) + */ + else if (errno == EINTR) + return -2; /* Retry */ + else + rc = -1; /* An error occurred */ + } + + free(data); + return rc; +} + +int +coio_popen_try_to_read(void *fh, void *buf, size_t count, + size_t *read_bytes, int *source_id) +{ + struct popen_data *data = (struct popen_data *)fh; + + if (data == NULL){ + errno = EBADF; + return -1; + } + + fd_set rfds; + FD_ZERO(&rfds); + ssize_t received = 0; + int num = 0; + + if (data->fh[STDOUT_FILENO] >= 0) { + FD_SET(data->fh[STDOUT_FILENO], &rfds); + ++num; + } + if (data->fh[STDERR_FILENO] >= 0) { + FD_SET(data->fh[STDERR_FILENO], &rfds); + ++num; + } + + if (num == 0) { + /* + * There are no open handles for reading + */ + errno = EBADF; + return -1; + } + + struct timeval tv = {0,0}; + int max_h = MAX(data->fh[STDOUT_FILENO], + data->fh[STDERR_FILENO]); + + errno = 0; + int retv = select(max_h + 1, &rfds, NULL, NULL, &tv); + switch (retv) { + case -1: /* Error */ + return -1; + case 0: /* Not ready yet */ + return -2; + case 1: { /* One socket is ready */ + + /* Choose the socket */ + int fno = STDOUT_FILENO; + if (!FD_ISSET(data->fh[fno], &rfds)) + fno = STDERR_FILENO; + if (!FD_ISSET(data->fh[fno], &rfds)) { + unreachable(); + return -1; + } + + received = read(data->fh[fno], buf, count); + if (received < 0) + goto on_error; + data->prev_source = fno; + *read_bytes = received; + *source_id = fno; + return 0; + } + case 2: { /* Both sockets are ready */ + received = read(get_handle_in_order(data), buf, count); + if (received < 0) + goto on_error; + *read_bytes = received; + *source_id = (int)data->prev_source; + return 0; + } + } + + unreachable(); + return -1; + +on_error: + if (errno == EINTR) { + *read_bytes = 0; + return -2; /* Repeat */ + } else + return -1; /* Error */ +} + +int +coio_popen_try_to_write(void *fh, const void *buf, size_t count, + size_t *written) +{ + if (count == 0) + return 0; + + struct popen_data *data = (struct popen_data *)fh; + + if (data == NULL){ + errno = EBADF; + return -1; + } + + if (data->fh[STDIN_FILENO] < 0) { + /* + * There are no open handles for writing + */ + errno = EBADF; + return -1; + } + + fd_set wfds; + FD_ZERO(&wfds); + + int wh = data->fh[STDIN_FILENO]; + FD_SET(wh, &wfds); + + struct timeval tv = {0,0}; + + int retv = select(wh + 1, NULL, &wfds, NULL, &tv); + if (retv < 0) + goto on_error; + else if (retv == 0) + return -2; /* Not ready yet */ + + assert(retv == 1); /* The socket is ready */ + + if (FD_ISSET(wh, &wfds)) { + ssize_t rc = write(wh, buf, count); + if (rc < 0) + goto on_error; + *written = rc; + return (*written == count) ? 0 + : -2; + } + + unreachable(); + return -1; + +on_error: + if (errno == EINTR) { + *written = 0; + return -2; /* Repeat */ + } else + return -1; /* Error */ +} \ No newline at end of file diff --git a/src/lib/core/coio_popen.h b/src/lib/core/coio_popen.h new file mode 100644 index 000000000..732c2b1f4 --- /dev/null +++ b/src/lib/core/coio_popen.h @@ -0,0 +1,123 @@ +#ifndef TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED +#define TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +/** + * Implementation of fio.popen. + * The function opens a process by creating a pipe, + * forking, and invoking the shell. + * + * @param command pointer to a null-terminated string + * containing a shell command line. + * This command is passed to /bin/sh using the -c flag; + * + * @param type pointer to a null-terminated string + * which must contain either the letter 'r' for reading + * or the letter 'w' for writing. + * + * @return handle of the pipe for reading or writing + * (depends on value of type). + * In a case of error returns NULL. + */ +void * +coio_popen_impl(const char *command, const char *type); + +/** + * Implementation of fio.pclose. + * The function tries to retrieve status of + * the associated process. + * If the associated process is terminated then releases + * allocated resources. + * If the associated process is still running the function + * returns immediately. In this case repeat the call. + * + * @param fh handle returned by fio.popen. + * + * @return 0 if the process is terminated + * @return -1 for an error + * @return -2 if the process is still running + */ +int +coio_try_pclose_impl(void *fh); + +/** + * The function reads up to count bytes from the handle + * associated with the child process. + * Returns immediately + * + * @param fd handle returned by fio.popen. + * @param buf a buffer to be read into + * @param count size of buffer in bytes + * @param read_bytes A pointer to the + * variable that receives the number of bytes read. + * @param source_id A pointer to the variable that receives a + * source stream id, 1 - for STDOUT, 2 - for STDERR. + * + * @return 0 data were successfully read + * @return -1 an error occurred, see errno for error code + * @return -2 there is nothing to read yet + */ +int +coio_popen_try_to_read(void *fh, void *buf, size_t count, + size_t *read_bytes, int *source_id); + +/** + * The function writes up to count bytes to the handle + * associated with the child process. + * Tries to write as much as possible without blocking + * and immediately returns. + * + * @param fd handle returned by fio.popen. + * @param buf a buffer to be written from + * @param count size of buffer in bytes + * @param written A pointer to the + * variable that receives the number of bytes actually written. + * If function fails the number of written bytes is undefined. + * + * @return 0 all data were successfully written + * @return -1 an error occurred, see errno for error code + * @return -2 the writing can block + */ +int +coio_popen_try_to_write(void *fh, const void *buf, size_t count, + size_t *written); + + +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* TARANTOOL_LIB_CORE_COIO_POPEN_H_INCLUDED */ diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h index 89fb04284..79f830555 100644 --- a/src/lib/core/fiber.h +++ b/src/lib/core/fiber.h @@ -488,8 +488,8 @@ struct cord { extern __thread struct cord *cord_ptr; #define cord() cord_ptr -#define fiber() cord()->fiber -#define loop() (cord()->loop) +#define fiber() (cord() ? cord()->fiber : NULL) +#define loop() (cord() ? cord()->loop : NULL) void cord_create(struct cord *cord, const char *name); diff --git a/src/lua/fio.c b/src/lua/fio.c index 806f4256b..65481edc0 100644 --- a/src/lua/fio.c +++ b/src/lua/fio.c @@ -703,7 +703,89 @@ lbox_fio_copyfile(struct lua_State *L) return lbox_fio_pushbool(L, coio_copyfile(source, dest) == 0); } +static int +lbox_fio_popen(struct lua_State *L) +{ + if (lua_gettop(L) < 1) { + usage: + luaL_error(L, "Usage: fio.popen(command, type)"); + } + + /* + * FILE *popen(const char *command, const char *type); + */ + const char *command = lua_tostring(L, 1); + if (command == NULL) + goto usage; + + const char *type = lua_tostring(L, 2); + if (type == NULL) + type = "r"; + + void* fh = coio_popen(command, type); + if (fh == NULL) { + lua_pushnil(L); + lbox_fio_pushsyserror(L); + return 2; + } + lua_pushlightuserdata(L, fh); + return 1; +} +static int +lbox_fio_pclose(struct lua_State *L) +{ + void* fd = lua_touserdata(L, 1); + return lbox_fio_pushbool(L, coio_pclose(fd) == 0); +} + +static int +lbox_fio_popen_read(struct lua_State *L) +{ + void* fh = lua_touserdata(L, 1); + uint32_t ctypeid; + char *buf = *(char **)luaL_checkcdata(L, 2, &ctypeid); + size_t len = lua_tonumber(L, 3); + + if (!len) { + lua_pushinteger(L, 0); + return 1; + } + + int output_number = 0; + int res = coio_popen_read(fh, buf, len, &output_number); + + if (res < 0) { + lua_pushnil(L); + lua_pushinteger(L, 0); + lbox_fio_pushsyserror(L); + return 3; + } + + lua_pushinteger(L, res); + lua_pushinteger(L, output_number); + return 2; +} + +static int +lbox_fio_popen_write(struct lua_State *L) +{ + void* fh = lua_touserdata(L, 1); + const char *buf = lua_tostring(L, 2); + uint32_t ctypeid = 0; + if (buf == NULL) + buf = *(const char **)luaL_checkcdata(L, 2, &ctypeid); + size_t len = lua_tonumber(L, 3); + + int res = coio_popen_write(fh, buf, len); + if (res < 0) { + lua_pushnil(L); + lbox_fio_pushsyserror(L); + return 2; + } + lua_pushinteger(L, res); + return 1; +} void tarantool_lua_fio_init(struct lua_State *L) @@ -747,6 +829,10 @@ tarantool_lua_fio_init(struct lua_State *L) { "listdir", lbox_fio_listdir }, { "fstat", lbox_fio_fstat }, { "copyfile", lbox_fio_copyfile, }, + { "popen", lbox_fio_popen }, + { "pclose", lbox_fio_pclose }, + { "popen_read", lbox_fio_popen_read }, + { "popen_write", lbox_fio_popen_write }, { NULL, NULL } }; luaL_register(L, NULL, internal_methods); diff --git a/src/lua/fio.lua b/src/lua/fio.lua index 38664a556..ed3a6c781 100644 --- a/src/lua/fio.lua +++ b/src/lua/fio.lua @@ -206,6 +206,86 @@ fio.open = function(path, flags, mode) return fh end +local popen_methods = {} + +-- read stdout & stderr of the process started by fio.popen +-- read() -> str, source +-- read(buf) -> len, source +-- read(size) -> str, source +-- read(buf, size) -> len, source +-- source contains id of the stream, +-- 1 - for stdout, 2 - for stderr, 0 - in a case of error +popen_methods.read = function(self, buf, size) + local tmpbuf + if (not ffi.istype(const_char_ptr_t, buf) and buf == nil) or + (ffi.istype(const_char_ptr_t, buf) and size == nil) then + size = 512 + end + if not ffi.istype(const_char_ptr_t, buf) then + size = buf or size + tmpbuf = buffer.ibuf() + buf = tmpbuf:reserve(size) + end + local res, output_no, err = internal.popen_read(self.fh, buf, size) + if res == nil then + if tmpbuf ~= nil then + tmpbuf:recycle() + end + return nil, output_no, err + end + if tmpbuf ~= nil then + tmpbuf:alloc(res) + res = ffi.string(tmpbuf.rpos, tmpbuf:size()) + tmpbuf:recycle() + end + return res, output_no +end + +-- write(str) +-- write(buf, len) +popen_methods.write = function(self, data, len) + if not ffi.istype(const_char_ptr_t, data) then + data = tostring(data) + len = #data + end + local res, err = internal.popen_write(self.fh, data, len) + if err ~= nil then + return false, err + end + return res >= 0 +end + +popen_methods.close = function(self) + local res, err = internal.pclose(self.fh) + self.fh = -1 + if err ~= nil then + return false, err + end + return res +end + + +local popen_mt = { __index = popen_methods } + +fio.popen = function(command, mode) + if type(command) ~= 'string' then + error("Usage: fio.popen(command, mode)") + end + + if type(mode) ~= 'string' then + mode = 'r' -- use default read-mode + end + + local fh,err = internal.popen(tostring(command), tostring(mode)) + if err ~= nil then + return nil, err + end + + fh = {fh = fh} + setmetatable(fh, popen_mt) + return fh +end + fio.pathjoin = function(...) local i, path = 1, nil diff --git a/test/box-tap/fio_popen.test.lua b/test/box-tap/fio_popen.test.lua new file mode 100755 index 000000000..6f984b51b --- /dev/null +++ b/test/box-tap/fio_popen.test.lua @@ -0,0 +1,79 @@ +#!/usr/bin/env tarantool + +local tap = require('tap') +local fio = require('fio') +local test = tap.test() + +test:plan(2 * 3) + +box.cfg{} + +local function start_app(script_path) + local fh = fio.popen(script_path, "r") + + if fh == nil then + local err = errno.strerror() + error(string.format("Failed to run app: %s, error: %s", script_path, err)) + return nil + else + return fh + end +end + +function read_stdout(fh) + local ss = "" + + local s,src = fh:read(64) + + while s ~= nil and s ~= "" do + if src == nil then + src = -1 + print("the source is undefined") + else + ss = ss .. string.format("[%d] %s", src, s) + end + + s,src = fh:read(64) + end + + return ss +end + +local build_path = os.getenv("BUILDDIR") +local app_path = fio.pathjoin(build_path, 'test/box-tap/fio_popen_test1.sh') + +local the_app = start_app(app_path) +test:ok(the_app ~= nil, "starting a child process 1") + +local app_output = read_stdout(the_app) +test:ok(app_output ~= nil, "response from child process 1") + +local expected_output = "[1] 1\n[1] 2\n[1] 3\n[1] 4\n[1] 5\n[1] 6\n[1] 7\n[1] 8\n[1] 9\n[1] 10\n" + +print("Expected:\n" .. expected_output) +print("\nReceived:\n" .. app_output) + +test:ok(app_output == expected_output, "compare response 1") + +the_app:close() + +-- Try to get STDERR output +the_app = start_app("/fufel/fufel.sh") +test:ok(the_app ~= nil, "starting a not existing process") + +app_output = read_stdout(the_app) +test:ok(app_output ~= nil, "response from child process 2") + +local expected_output = "[2] sh: 1: /fufel/fufel.sh: not found\n" + +print("Expected:\n" .. expected_output) +print("\nReceived:\n" .. app_output) + +test:ok(app_output == expected_output, "compare response 2") + +the_app:close() + + +test:check() +os.exit(0) + diff --git a/test/box-tap/fio_popen_test1.sh b/test/box-tap/fio_popen_test1.sh new file mode 100755 index 000000000..becf83e6d --- /dev/null +++ b/test/box-tap/fio_popen_test1.sh @@ -0,0 +1,7 @@ +#!/bin/bash +for i in {1..10} +do + echo $i + sleep 1 +done + diff --git a/third_party/libeio/eio.c b/third_party/libeio/eio.c index 7351d5dda..e433b1b3b 100644 --- a/third_party/libeio/eio.c +++ b/third_party/libeio/eio.c @@ -1741,7 +1741,24 @@ static int eio_threads; static void ecb_cold eio_prefork() { - eio_threads = etp_set_max_parallel(EIO_POOL, 0); + /* + * When fork() is called libeio shuts + * down all working threads. + * But it causes a deadlock if fork() was + * called from the libeio thread. + * To avoid this do not close the + * thread who called fork(). + * This behaviour is acceptable for the + * case when fork() is immediately followed + * by exec(). + * To clone a process call fork() from the + * main thread. + */ + + if (etp_is_in_pool_thread()) + eio_threads = etp_get_max_parallel(EIO_POOL); + else + eio_threads = etp_set_max_parallel(EIO_POOL, 0); } static void ecb_cold diff --git a/third_party/libeio/etp.c b/third_party/libeio/etp.c index 42f7661eb..38c2d1f9b 100644 --- a/third_party/libeio/etp.c +++ b/third_party/libeio/etp.c @@ -164,6 +164,21 @@ struct etp_pool_user xmutex_t lock; }; +static __thread int is_eio_thread = 0; + +/** + * Check whether the current thread belongs to + * libeio thread pool or just a generic thread. + * + * @return 0 for generic thread + * @return 1 for libeio thread pool + **/ +ETP_API_DECL int ecb_cold +etp_is_in_pool_thread() +{ + return is_eio_thread; +} + /* worker threads management */ static void ecb_cold @@ -322,6 +337,9 @@ X_THREAD_PROC (etp_proc) self.pool = pool; etp_pool_user user; /* per request */ +/* Distinguish libeio threads from the generic threads */ + is_eio_thread = 1; + etp_proc_init (); /* try to distribute timeouts somewhat evenly (nanosecond part) */ @@ -616,3 +634,13 @@ etp_set_max_parallel (etp_pool pool, unsigned int threads) X_UNLOCK (pool->lock); return retval; } + +ETP_API_DECL int ecb_cold +etp_get_max_parallel (etp_pool pool) +{ + int retval; + X_LOCK (pool->lock); + retval = pool->wanted; + X_UNLOCK (pool->lock); + return retval; +} diff --git a/third_party/libev/ev.c b/third_party/libev/ev.c index 6a2648591..5fa8293a1 100644 --- a/third_party/libev/ev.c +++ b/third_party/libev/ev.c @@ -4214,6 +4214,8 @@ noinline void ev_signal_stop (EV_P_ ev_signal *w) EV_THROW { + if (!loop) + return; clear_pending (EV_A_ (W)w); if (expect_false (!ev_is_active (w))) return; -- 2.17.1
next reply other threads:[~2019-04-24 17:00 UTC|newest] Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-04-24 17:00 Stanislav Zudin [this message] 2019-05-07 8:49 ` Vladimir Davydov 2019-05-07 10:31 ` Vladimir Davydov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20190424170032.10726-1-szudin@tarantool.org \ --to=szudin@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH] core: Non-blocking io.popen' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox