trace.install()
trace.default_filter()
+ -- e2lib.callcmd_poll internals
+ trace.filter_function('e2lib', 'fd_linebuffer')
+ trace.filter_function('e2lib', 'fd_linebuffer_final')
+ trace.filter_function('e2lib', 'fd_find_writefunc_by_readfd')
+
e2lib.signal_reset()
e2lib.closefrom(3)
end
end
+--- If an error occurs and we have a valid PID, retrieve the exit status of the
+-- child and append it to error message, then attempt to kill(INT) the child.
+-- This function may hang until the child exits.
+-- @param pid Process ID
+-- @param e Err object for appending status message.
+local function _retrieve_status_kill(pid, e)
+ if pid then
+ assertIsNumber(pid)
+
+ e2lib.logf(4, "waiting for exit status of pid %d", pid)
+
+ local rc, re, sig = e2lib.wait_pid_delete(pid, true)
+ if not rc then
+ re:cat(e)
+ return
+ end
+
+ if re == 0 then
+ e2lib.logf(4, "pid %d still running, sending SIGINT", pid)
+ children_send_sigint(pid)
+ rc, re, sig = e2lib.wait_pid_delete(pid, false)
+ if not rc then
+ re:cat(e)
+ return
+ end
+ end
+
+ err.new("process %d returned code %d, signal %d",
+ re, rc, sig or 0):cat(e) -- tricky!
+ end
+end
+
+--- Poll all set up file descriptors in one or more fdctv tables. Used in
+-- conjunction with callcmd() nowait='nopoll' for pipes and multiprocess calls.
+-- Does IO and calls callbacks until there are no more file descriptors
+-- requiring service.
+-- @param ... One or more fdctv tables
+-- @return True on success, false on error.
+-- @return Error object on failure.
+function e2lib.callcmd_poll(...)
+
+ -- trace filtered
+ local function fd_linebuffer(fdct, data)
+ local linepos
+
+ fdct._p.buffer = fdct._p.buffer..data
+ repeat
+ linepos = string.find(fdct._p.buffer, "\n")
+ if linepos then
+ fdct.callfn(string.sub(fdct._p.buffer, 1, linepos))
+ fdct._p.buffer = string.sub(fdct._p.buffer, linepos + 1)
+ end
+ until not linepos
+ end
+
+ -- trace filtered
+ local function fd_linebuffer_final(fdct, data)
+ if fdct.linebuffer and fdct._p.buffer ~= "" then
+ fdct.callfn(fdct._p.buffer)
+ fdct._p.buffer = ""
+ end
+ end
+
+ -- trace filtered
+ local function fd_find_writefunc_by_readfd(fdctv, fd)
+ for _,fdct in ipairs(fdctv) do
+ if fdct.istype == "writefunc" and fdct._p.rfd == fd then
+ return fdct
+ end
+ end
+
+ return false
+ end
+
+ local rc, re, fdvec, pollvec, fdvec, fdct, fdctv
+
+ fdctv = {}
+ for _, t in ipairs({...}) do
+ assertIsTable(t)
+ for _, fdct in ipairs(t) do
+ assertIsTable(fdct)
+ -- all fdct must have a _p table if initialized by callcmd()
+ assertIsTable(fdct._p)
+ table.insert(fdctv, fdct)
+ end
+ end
+
+ fdvec = {}
+ for _,fdct in ipairs(fdctv) do
+ if fdct.istype == "writefunc" then
+ table.insert(fdvec, fdct._p.rfd)
+ end
+ end
+
+ while #fdvec > 0 do
+ pollvec, re = e2lib.poll(-1, fdvec)
+ if not pollvec then
+ return false, re
+ elseif #pollvec == 0 then
+ return false, err.new("poll timeout")
+ end
+
+ for _,ptab in ipairs(pollvec) do
+ if ptab.POLLIN then
+ fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd)
+ if fdct then
+ local data, eno
+
+ data, re, eno = eio.read(fdct._p.rfd, 4096)
+ if not data then
+ if eno ~= errno.def2errnum("EINTR") then
+ return false, _retrieve_status_kill(fdct._p.pid, re)
+ end
+ e2lib.logf(4, "poll loop read: fd=%d -> EINTR",
+ fdct._p.rfd)
+ elseif data ~= "" then
+ if fdct.linebuffer then
+ fd_linebuffer(fdct, data)
+ else
+ fdct.callfn(data)
+ end
+ end
+ end
+ elseif ptab.POLLOUT then
+ return false, err.new("poll unexpectedly returned POLLOUT")
+ else
+ -- Nothing to read, nothing to write, file descriptor
+ -- was closed.
+ --
+ -- Flush remaining buffers if linebuffer is enabled
+ -- and the last fread did not end with \n.
+ fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd)
+ if fdct then
+ fd_linebuffer_final(fdct)
+ end
+ table.remove(fdvec, ptab.fdvecpos)
+ end
+ end
+ end
+
+ return true
+end
+
+--- Cleanup after callcmd_poll(), close file descriptors opened by callcmd()
+-- @param fdctv table
+-- @return true on success, false on error.
+-- @return Error object on failure.
+function e2lib.callcmd_cleanup(fdctv)
+ local rc, re
+ for _,fdct in ipairs(fdctv) do
+ if fdct.istype == "writefunc" then
+ rc, re = eio.close(fdct._p.rfd)
+ if not rc then
+ return false, _retrieve_status_kill(fdct._p.pid, re)
+ end
+ end
+ fdct._p = nil
+ end
+
+ return true
+end
+
--- File descriptor config table vector. This vector simply holds file
-- descriptor config tables.
-- @table fdctv
-- overwritten.
-- @param nowait Optional - if true, return the PID instead of calling wait() to
-- get the return code.
+-- If 'nopoll', return PID instead of poll()ing the set up FDs and
+-- wait(). This is useful for pipes/multiprocess commands.
-- @param pty Allocate a pseudo tty
-- @return Return code of the child is returned. It's the callers responsibility
-- to make sense of the value. If the return code is false, an error
-- To keep this large mess somewhat grokable, split into multiple functions.
- local function fd_parent_setup(fdctv)
+ local function fd_parent_setup(fdctv, argv)
local rc, re
for _,fdct in ipairs(fdctv) do
+
+ -- each fdct gets a private table to mark it as initialized
+ fdct._p = {}
+ fdct._p.argv = argv -- for diagnostics
+
if fdct.istype == "writefunc" then
rc, re = eio.pipe()
if not rc then
return false, re
end
- fdct._p = {}
fdct._p.rfd = rc
fdct._p.wfd = re
fdct._p.buffer = ""
end
end
- local function fd_parent_after_fork(fdctv)
+ local function fd_parent_after_fork(fdctv, pid)
local rc, re
for _,fdct in ipairs(fdctv) do
if fdct.istype == "writefunc" then
return false, re
end
end
- end
-
- return true
- end
-
- local function fd_find_writefunc_by_readfd(fdctv, fd)
- for _,fdct in ipairs(fdctv) do
- if fdct.istype == "writefunc" and fdct._p.rfd == fd then
- return fdct
- end
- end
-
- return false
- end
-
- local function fd_parent_poll(fdctv)
-
- local function fd_linebuffer(fdct, data)
- local linepos
-
- fdct._p.buffer = fdct._p.buffer..data
- repeat
- linepos = string.find(fdct._p.buffer, "\n")
- if linepos then
- fdct.callfn(string.sub(fdct._p.buffer, 1, linepos))
- fdct._p.buffer = string.sub(fdct._p.buffer, linepos + 1)
- end
- until not linepos
- end
-
- local function fd_linebuffer_final(fdct, data)
- if fdct.linebuffer and fdct._p.buffer ~= "" then
- fdct.callfn(fdct._p.buffer)
- fdct._p.buffer = ""
- end
- end
-
- local rc, re, fdvec, pollvec, fdvec, fdct
-
- fdvec = {}
- for _,fdct in ipairs(fdctv) do
- if fdct.istype == "writefunc" then
- table.insert(fdvec, fdct._p.rfd)
- end
- end
-
- while #fdvec > 0 do
- pollvec, re = e2lib.poll(-1, fdvec)
- if not pollvec then
- return false, re
- elseif #pollvec == 0 then
- return false, err.new("poll timeout")
- end
-
- for _,ptab in ipairs(pollvec) do
- if ptab.POLLIN then
- fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd)
- if fdct then
- local data, eno
-
- data, re, eno = eio.read(fdct._p.rfd, 4096)
- if not data then
- if eno ~= errno.def2errnum("EINTR") then
- return false, re
- end
- e2lib.logf(4, "poll loop read: fd=%d -> EINTR",
- fdct._p.rfd)
- elseif data ~= "" then
- if fdct.linebuffer then
- fd_linebuffer(fdct, data)
- else
- fdct.callfn(data)
- end
- end
- end
- elseif ptab.POLLOUT then
- return false, err.new("poll unexpectedly returned POLLOUT")
- else
- -- Nothing to read, nothing to write, file descriptor
- -- was closed.
- --
- -- Flush remaining buffers if linebuffer is enabled
- -- and the last fread did not end with \n.
- fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd)
- if fdct then
- fd_linebuffer_final(fdct)
- end
- table.remove(fdvec, ptab.fdvecpos)
- end
- end
- end
-
- return true
- end
-
- local function fd_parent_cleanup(fdctv)
- local rc, re
- for _,fdct in ipairs(fdctv) do
- if fdct.istype == "writefunc" then
- rc, re = eio.close(fdct._p.rfd)
- if not rc then
- return false, re
- end
- end
+ -- carry the pid for each fd, to differentiate in callcmd_poll when
+ -- setting up pipes and to ease diagnostics.
+ fdct._p.pid = pid
end
return true
return false, err.new("signal received, shutting down e2factory")
end
- rc, re = fd_parent_setup(fdctv)
+ rc, re = fd_parent_setup(fdctv, argv)
if not rc then
return false, re
end
-- needs to be returned to the caller from here on out.
children_insert(pid, fdm)
- rc, re = fd_parent_after_fork(fdctv)
+ rc, re = fd_parent_after_fork(fdctv, pid)
if not rc then
return false, re, pid
end
return true, nil, pid
end
- -- If an error occurs and we have a valid PID, retrieve the exit
- -- status of the child and add it to error message.
- local function extract_err_status(pid, e)
- if pid then
- e2lib.logf(4, "waiting for exit status of pid %d", pid)
-
- local rc, re, sig = e2lib.wait_pid_delete(pid, true)
- if not rc then
- re:cat(e)
- return
- end
-
- if re == 0 then
- e2lib.logf(4, "pid %d still running, sending SIGINT", pid)
- children_send_sigint(pid)
- rc, re, sig = e2lib.wait_pid_delete(pid, false)
- if not rc then
- re:cat(e)
- return
- end
- end
+ local rc, re, pid, sig
- err.new("process %d returned code %d, signal %d",
- re, rc, sig or 0):cat(e) -- tricky!
- end
- end
+ assert(nowait == nil or nowait == false
+ or nowait == true or nowait == 'nopoll')
- local rc, re, pid, sig
+ e2lib.logf(4, 'e2lib.callcmd argv=%q', table.concat(argv, '", "'))
-- fork dance, enter critical section, block all signals
le2lib.signal_block()
rc, re, pid = do_fork(argv, fdctv, workdir, envdict, nowait, pty)
le2lib.signal_unblock()
if not rc then
- extract_err_status(pid, re)
+ _retrieve_status_kill(pid, re)
return false, re
end
+ if nowait == 'nopoll' then
+ return pid
+ end
+
-- poll loop
- rc, re = fd_parent_poll(fdctv)
+ rc, re = e2lib.callcmd_poll(fdctv)
if not rc then
- extract_err_status(pid, re)
return false, re
end
- rc, re = fd_parent_cleanup(fdctv)
+ rc, re = e2lib.callcmd_cleanup(fdctv)
if not rc then
- extract_err_status(pid, re)
return false, re
end