From: Tobias Ulmer Date: Thu, 9 May 2019 14:29:12 +0000 (+0200) Subject: e2lib: split callcmd() further to allow for external poll loop X-Git-Tag: e2factory-2.3.18rc2~3 X-Git-Url: https://git.e2factory.org/?a=commitdiff_plain;h=f1f71daea802555781d6e80a8f1e34a104f31e32;p=e2factory.git e2lib: split callcmd() further to allow for external poll loop Signed-off-by: Tobias Ulmer --- diff --git a/generic/e2lib.lua b/generic/e2lib.lua index f6e1ea4..560523b 100644 --- a/generic/e2lib.lua +++ b/generic/e2lib.lua @@ -568,6 +568,11 @@ function e2lib.init() trace.install() trace.default_filter() + -- e2lib.callcmd_poll internals + trace.filter_function('e2lib', 'fd_linebuffer') + trace.filter_function('e2lib', 'fd_linebuffer_final') + trace.filter_function('e2lib', 'fd_find_writefunc_by_readfd') + e2lib.signal_reset() e2lib.closefrom(3) @@ -1445,6 +1450,168 @@ function e2lib.directory(path, dotfiles, noerror) end end +--- If an error occurs and we have a valid PID, retrieve the exit status of the +-- child and append it to error message, then attempt to kill(INT) the child. +-- This function may hang until the child exits. +-- @param pid Process ID +-- @param e Err object for appending status message. +local function _retrieve_status_kill(pid, e) + if pid then + assertIsNumber(pid) + + e2lib.logf(4, "waiting for exit status of pid %d", pid) + + local rc, re, sig = e2lib.wait_pid_delete(pid, true) + if not rc then + re:cat(e) + return + end + + if re == 0 then + e2lib.logf(4, "pid %d still running, sending SIGINT", pid) + children_send_sigint(pid) + rc, re, sig = e2lib.wait_pid_delete(pid, false) + if not rc then + re:cat(e) + return + end + end + + err.new("process %d returned code %d, signal %d", + re, rc, sig or 0):cat(e) -- tricky! + end +end + +--- Poll all set up file descriptors in one or more fdctv tables. Used in +-- conjunction with callcmd() nowait='nopoll' for pipes and multiprocess calls. +-- Does IO and calls callbacks until there are no more file descriptors +-- requiring service. +-- @param ... One or more fdctv tables +-- @return True on success, false on error. +-- @return Error object on failure. +function e2lib.callcmd_poll(...) + + -- trace filtered + local function fd_linebuffer(fdct, data) + local linepos + + fdct._p.buffer = fdct._p.buffer..data + repeat + linepos = string.find(fdct._p.buffer, "\n") + if linepos then + fdct.callfn(string.sub(fdct._p.buffer, 1, linepos)) + fdct._p.buffer = string.sub(fdct._p.buffer, linepos + 1) + end + until not linepos + end + + -- trace filtered + local function fd_linebuffer_final(fdct, data) + if fdct.linebuffer and fdct._p.buffer ~= "" then + fdct.callfn(fdct._p.buffer) + fdct._p.buffer = "" + end + end + + -- trace filtered + local function fd_find_writefunc_by_readfd(fdctv, fd) + for _,fdct in ipairs(fdctv) do + if fdct.istype == "writefunc" and fdct._p.rfd == fd then + return fdct + end + end + + return false + end + + local rc, re, fdvec, pollvec, fdvec, fdct, fdctv + + fdctv = {} + for _, t in ipairs({...}) do + assertIsTable(t) + for _, fdct in ipairs(t) do + assertIsTable(fdct) + -- all fdct must have a _p table if initialized by callcmd() + assertIsTable(fdct._p) + table.insert(fdctv, fdct) + end + end + + fdvec = {} + for _,fdct in ipairs(fdctv) do + if fdct.istype == "writefunc" then + table.insert(fdvec, fdct._p.rfd) + end + end + + while #fdvec > 0 do + pollvec, re = e2lib.poll(-1, fdvec) + if not pollvec then + return false, re + elseif #pollvec == 0 then + return false, err.new("poll timeout") + end + + for _,ptab in ipairs(pollvec) do + if ptab.POLLIN then + fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd) + if fdct then + local data, eno + + data, re, eno = eio.read(fdct._p.rfd, 4096) + if not data then + if eno ~= errno.def2errnum("EINTR") then + return false, _retrieve_status_kill(fdct._p.pid, re) + end + e2lib.logf(4, "poll loop read: fd=%d -> EINTR", + fdct._p.rfd) + elseif data ~= "" then + if fdct.linebuffer then + fd_linebuffer(fdct, data) + else + fdct.callfn(data) + end + end + end + elseif ptab.POLLOUT then + return false, err.new("poll unexpectedly returned POLLOUT") + else + -- Nothing to read, nothing to write, file descriptor + -- was closed. + -- + -- Flush remaining buffers if linebuffer is enabled + -- and the last fread did not end with \n. + fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd) + if fdct then + fd_linebuffer_final(fdct) + end + table.remove(fdvec, ptab.fdvecpos) + end + end + end + + return true +end + +--- Cleanup after callcmd_poll(), close file descriptors opened by callcmd() +-- @param fdctv table +-- @return true on success, false on error. +-- @return Error object on failure. +function e2lib.callcmd_cleanup(fdctv) + local rc, re + for _,fdct in ipairs(fdctv) do + if fdct.istype == "writefunc" then + rc, re = eio.close(fdct._p.rfd) + if not rc then + return false, _retrieve_status_kill(fdct._p.pid, re) + end + end + fdct._p = nil + end + + return true +end + --- File descriptor config table vector. This vector simply holds file -- descriptor config tables. -- @table fdctv @@ -1493,6 +1660,8 @@ end -- overwritten. -- @param nowait Optional - if true, return the PID instead of calling wait() to -- get the return code. +-- If 'nopoll', return PID instead of poll()ing the set up FDs and +-- wait(). This is useful for pipes/multiprocess commands. -- @param pty Allocate a pseudo tty -- @return Return code of the child is returned. It's the callers responsibility -- to make sense of the value. If the return code is false, an error @@ -1504,16 +1673,20 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty) -- To keep this large mess somewhat grokable, split into multiple functions. - local function fd_parent_setup(fdctv) + local function fd_parent_setup(fdctv, argv) local rc, re for _,fdct in ipairs(fdctv) do + + -- each fdct gets a private table to mark it as initialized + fdct._p = {} + fdct._p.argv = argv -- for diagnostics + if fdct.istype == "writefunc" then rc, re = eio.pipe() if not rc then return false, re end - fdct._p = {} fdct._p.rfd = rc fdct._p.wfd = re fdct._p.buffer = "" @@ -1571,7 +1744,7 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty) end end - local function fd_parent_after_fork(fdctv) + local function fd_parent_after_fork(fdctv, pid) local rc, re for _,fdct in ipairs(fdctv) do if fdct.istype == "writefunc" then @@ -1580,110 +1753,9 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty) return false, re end end - end - - return true - end - - local function fd_find_writefunc_by_readfd(fdctv, fd) - for _,fdct in ipairs(fdctv) do - if fdct.istype == "writefunc" and fdct._p.rfd == fd then - return fdct - end - end - - return false - end - - local function fd_parent_poll(fdctv) - - local function fd_linebuffer(fdct, data) - local linepos - - fdct._p.buffer = fdct._p.buffer..data - repeat - linepos = string.find(fdct._p.buffer, "\n") - if linepos then - fdct.callfn(string.sub(fdct._p.buffer, 1, linepos)) - fdct._p.buffer = string.sub(fdct._p.buffer, linepos + 1) - end - until not linepos - end - - local function fd_linebuffer_final(fdct, data) - if fdct.linebuffer and fdct._p.buffer ~= "" then - fdct.callfn(fdct._p.buffer) - fdct._p.buffer = "" - end - end - - local rc, re, fdvec, pollvec, fdvec, fdct - - fdvec = {} - for _,fdct in ipairs(fdctv) do - if fdct.istype == "writefunc" then - table.insert(fdvec, fdct._p.rfd) - end - end - - while #fdvec > 0 do - pollvec, re = e2lib.poll(-1, fdvec) - if not pollvec then - return false, re - elseif #pollvec == 0 then - return false, err.new("poll timeout") - end - - for _,ptab in ipairs(pollvec) do - if ptab.POLLIN then - fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd) - if fdct then - local data, eno - - data, re, eno = eio.read(fdct._p.rfd, 4096) - if not data then - if eno ~= errno.def2errnum("EINTR") then - return false, re - end - e2lib.logf(4, "poll loop read: fd=%d -> EINTR", - fdct._p.rfd) - elseif data ~= "" then - if fdct.linebuffer then - fd_linebuffer(fdct, data) - else - fdct.callfn(data) - end - end - end - elseif ptab.POLLOUT then - return false, err.new("poll unexpectedly returned POLLOUT") - else - -- Nothing to read, nothing to write, file descriptor - -- was closed. - -- - -- Flush remaining buffers if linebuffer is enabled - -- and the last fread did not end with \n. - fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd) - if fdct then - fd_linebuffer_final(fdct) - end - table.remove(fdvec, ptab.fdvecpos) - end - end - end - - return true - end - - local function fd_parent_cleanup(fdctv) - local rc, re - for _,fdct in ipairs(fdctv) do - if fdct.istype == "writefunc" then - rc, re = eio.close(fdct._p.rfd) - if not rc then - return false, re - end - end + -- carry the pid for each fd, to differentiate in callcmd_poll when + -- setting up pipes and to ease diagnostics. + fdct._p.pid = pid end return true @@ -1799,7 +1871,7 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty) return false, err.new("signal received, shutting down e2factory") end - rc, re = fd_parent_setup(fdctv) + rc, re = fd_parent_setup(fdctv, argv) if not rc then return false, re end @@ -1864,7 +1936,7 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty) -- needs to be returned to the caller from here on out. children_insert(pid, fdm) - rc, re = fd_parent_after_fork(fdctv) + rc, re = fd_parent_after_fork(fdctv, pid) if not rc then return false, re, pid end @@ -1877,54 +1949,34 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty) return true, nil, pid end - -- If an error occurs and we have a valid PID, retrieve the exit - -- status of the child and add it to error message. - local function extract_err_status(pid, e) - if pid then - e2lib.logf(4, "waiting for exit status of pid %d", pid) - - local rc, re, sig = e2lib.wait_pid_delete(pid, true) - if not rc then - re:cat(e) - return - end - - if re == 0 then - e2lib.logf(4, "pid %d still running, sending SIGINT", pid) - children_send_sigint(pid) - rc, re, sig = e2lib.wait_pid_delete(pid, false) - if not rc then - re:cat(e) - return - end - end + local rc, re, pid, sig - err.new("process %d returned code %d, signal %d", - re, rc, sig or 0):cat(e) -- tricky! - end - end + assert(nowait == nil or nowait == false + or nowait == true or nowait == 'nopoll') - local rc, re, pid, sig + e2lib.logf(4, 'e2lib.callcmd argv=%q', table.concat(argv, '", "')) -- fork dance, enter critical section, block all signals le2lib.signal_block() rc, re, pid = do_fork(argv, fdctv, workdir, envdict, nowait, pty) le2lib.signal_unblock() if not rc then - extract_err_status(pid, re) + _retrieve_status_kill(pid, re) return false, re end + if nowait == 'nopoll' then + return pid + end + -- poll loop - rc, re = fd_parent_poll(fdctv) + rc, re = e2lib.callcmd_poll(fdctv) if not rc then - extract_err_status(pid, re) return false, re end - rc, re = fd_parent_cleanup(fdctv) + rc, re = e2lib.callcmd_cleanup(fdctv) if not rc then - extract_err_status(pid, re) return false, re end