]> git.e2factory.org Git - e2factory.git/commitdiff
e2lib: split callcmd() further to allow for external poll loop
authorTobias Ulmer <tu@emlix.com>
Thu, 9 May 2019 14:29:12 +0000 (16:29 +0200)
committerTobias Ulmer <tu@emlix.com>
Thu, 9 May 2019 14:29:12 +0000 (16:29 +0200)
Signed-off-by: Tobias Ulmer <tu@emlix.com>
generic/e2lib.lua

index f6e1ea40fda5f6004ffcf235a6b903781d0dc523..560523b3ce45db87405c29ae1ac6f04074608544 100644 (file)
@@ -568,6 +568,11 @@ function e2lib.init()
     trace.install()
     trace.default_filter()
 
+    -- e2lib.callcmd_poll internals
+    trace.filter_function('e2lib', 'fd_linebuffer')
+    trace.filter_function('e2lib', 'fd_linebuffer_final')
+    trace.filter_function('e2lib', 'fd_find_writefunc_by_readfd')
+
     e2lib.signal_reset()
 
     e2lib.closefrom(3)
@@ -1445,6 +1450,168 @@ function e2lib.directory(path, dotfiles, noerror)
     end
 end
 
+--- If an error occurs and we have a valid PID, retrieve the exit status of the
+-- child and append it to error message, then attempt to kill(INT) the child.
+-- This function may hang until the child exits.
+-- @param pid Process ID
+-- @param e Err object for appending status message.
+local function _retrieve_status_kill(pid, e)
+    if pid then
+        assertIsNumber(pid)
+
+        e2lib.logf(4, "waiting for exit status of pid %d", pid)
+
+        local rc, re, sig = e2lib.wait_pid_delete(pid, true)
+        if not rc then
+            re:cat(e)
+            return
+        end
+
+        if re == 0 then
+            e2lib.logf(4, "pid %d still running, sending SIGINT", pid)
+            children_send_sigint(pid)
+            rc, re, sig = e2lib.wait_pid_delete(pid, false)
+            if not rc then
+                re:cat(e)
+                return
+            end
+        end
+
+        err.new("process %d returned code %d, signal %d",
+            re, rc, sig or 0):cat(e) -- tricky!
+    end
+end
+
+--- Poll all set up file descriptors in one or more fdctv tables. Used in
+-- conjunction with callcmd() nowait='nopoll' for pipes and multiprocess calls.
+-- Does IO and calls callbacks until there are no more file descriptors
+-- requiring service.
+-- @param ... One or more fdctv tables
+-- @return True on success, false on error.
+-- @return Error object on failure.
+function e2lib.callcmd_poll(...)
+
+    -- trace filtered
+    local function fd_linebuffer(fdct, data)
+        local linepos
+
+        fdct._p.buffer = fdct._p.buffer..data
+        repeat
+            linepos = string.find(fdct._p.buffer, "\n")
+            if linepos then
+                fdct.callfn(string.sub(fdct._p.buffer, 1, linepos))
+                fdct._p.buffer = string.sub(fdct._p.buffer, linepos + 1)
+            end
+        until not linepos
+    end
+
+    -- trace filtered
+    local function fd_linebuffer_final(fdct, data)
+        if fdct.linebuffer and fdct._p.buffer ~= "" then
+            fdct.callfn(fdct._p.buffer)
+            fdct._p.buffer = ""
+        end
+    end
+
+    -- trace filtered
+    local function fd_find_writefunc_by_readfd(fdctv, fd)
+        for _,fdct in ipairs(fdctv) do
+            if fdct.istype == "writefunc" and fdct._p.rfd == fd then
+                return fdct
+            end
+        end
+
+        return false
+    end
+
+    local rc, re, fdvec, pollvec, fdvec, fdct, fdctv
+
+    fdctv = {}
+    for _, t in ipairs({...}) do
+        assertIsTable(t)
+        for _, fdct in ipairs(t) do
+            assertIsTable(fdct)
+            -- all fdct must have a _p table if initialized by callcmd()
+            assertIsTable(fdct._p)
+            table.insert(fdctv, fdct)
+        end
+    end
+
+    fdvec = {}
+    for _,fdct in ipairs(fdctv) do
+        if fdct.istype == "writefunc" then
+            table.insert(fdvec, fdct._p.rfd)
+        end
+    end
+
+    while #fdvec > 0 do
+        pollvec, re = e2lib.poll(-1, fdvec)
+        if not pollvec then
+            return false, re
+        elseif #pollvec == 0 then
+            return false, err.new("poll timeout")
+        end
+
+        for _,ptab in ipairs(pollvec) do
+            if ptab.POLLIN then
+                fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd)
+                if fdct then
+                    local data, eno
+
+                    data, re, eno = eio.read(fdct._p.rfd, 4096)
+                    if not data then
+                        if eno ~= errno.def2errnum("EINTR") then
+                            return false, _retrieve_status_kill(fdct._p.pid, re)
+                        end
+                        e2lib.logf(4, "poll loop read: fd=%d -> EINTR",
+                            fdct._p.rfd)
+                    elseif data ~= "" then
+                        if fdct.linebuffer then
+                            fd_linebuffer(fdct, data)
+                        else
+                            fdct.callfn(data)
+                        end
+                    end
+                end
+            elseif ptab.POLLOUT then
+                return false, err.new("poll unexpectedly returned POLLOUT")
+            else
+                -- Nothing to read, nothing to write, file descriptor
+                -- was closed.
+                --
+                -- Flush remaining buffers if linebuffer is enabled
+                -- and the last fread did not end with \n.
+                fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd)
+                if fdct then
+                    fd_linebuffer_final(fdct)
+                end
+                table.remove(fdvec, ptab.fdvecpos)
+            end
+        end
+    end
+
+    return true
+end
+
+--- Cleanup after callcmd_poll(), close file descriptors opened by callcmd()
+-- @param fdctv table
+-- @return true on success, false on error.
+-- @return Error object on failure.
+function e2lib.callcmd_cleanup(fdctv)
+    local rc, re
+    for _,fdct in ipairs(fdctv) do
+        if fdct.istype == "writefunc" then
+            rc, re = eio.close(fdct._p.rfd)
+            if not rc then
+                return false, _retrieve_status_kill(fdct._p.pid, re)
+            end
+        end
+        fdct._p = nil
+    end
+
+    return true
+end
+
 --- File descriptor config table vector. This vector simply holds file
 -- descriptor config tables.
 -- @table fdctv
@@ -1493,6 +1660,8 @@ end
 --                overwritten.
 -- @param nowait Optional - if true, return the PID instead of calling wait() to
 --               get the return code.
+--               If 'nopoll', return PID instead of poll()ing the set up FDs and
+--               wait(). This is useful for pipes/multiprocess commands.
 -- @param pty Allocate a pseudo tty
 -- @return Return code of the child is returned. It's the callers responsibility
 --         to make sense of the value. If the return code is false, an error
@@ -1504,16 +1673,20 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty)
 
     -- To keep this large mess somewhat grokable, split into multiple functions.
 
-    local function fd_parent_setup(fdctv)
+    local function fd_parent_setup(fdctv, argv)
         local rc, re
         for _,fdct in ipairs(fdctv) do
+
+            -- each fdct gets a private table to mark it as initialized
+            fdct._p = {}
+            fdct._p.argv = argv -- for diagnostics
+
             if fdct.istype == "writefunc" then
                 rc, re = eio.pipe()
                 if not rc then
                     return false, re
                 end
 
-                fdct._p = {}
                 fdct._p.rfd = rc
                 fdct._p.wfd = re
                 fdct._p.buffer = ""
@@ -1571,7 +1744,7 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty)
         end
     end
 
-    local function fd_parent_after_fork(fdctv)
+    local function fd_parent_after_fork(fdctv, pid)
         local rc, re
         for _,fdct in ipairs(fdctv) do
             if fdct.istype == "writefunc" then
@@ -1580,110 +1753,9 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty)
                     return false, re
                 end
             end
-        end
-
-        return true
-    end
-
-    local function fd_find_writefunc_by_readfd(fdctv, fd)
-        for _,fdct in ipairs(fdctv) do
-            if fdct.istype == "writefunc" and fdct._p.rfd == fd then
-                return fdct
-            end
-        end
-
-        return false
-    end
-
-    local function fd_parent_poll(fdctv)
-
-        local function fd_linebuffer(fdct, data)
-            local linepos
-
-            fdct._p.buffer = fdct._p.buffer..data
-            repeat
-                linepos = string.find(fdct._p.buffer, "\n")
-                if linepos then
-                    fdct.callfn(string.sub(fdct._p.buffer, 1, linepos))
-                    fdct._p.buffer = string.sub(fdct._p.buffer, linepos + 1)
-                end
-            until not linepos
-        end
-
-        local function fd_linebuffer_final(fdct, data)
-            if fdct.linebuffer and fdct._p.buffer ~= "" then
-                fdct.callfn(fdct._p.buffer)
-                fdct._p.buffer = ""
-            end
-        end
-
-        local rc, re, fdvec, pollvec, fdvec, fdct
-
-        fdvec = {}
-        for _,fdct in ipairs(fdctv) do
-            if fdct.istype == "writefunc" then
-                table.insert(fdvec, fdct._p.rfd)
-            end
-        end
-
-        while #fdvec > 0 do
-            pollvec, re = e2lib.poll(-1, fdvec)
-            if not pollvec then
-                return false, re
-            elseif #pollvec == 0 then
-                return false, err.new("poll timeout")
-            end
-
-            for _,ptab in ipairs(pollvec) do
-                if ptab.POLLIN then
-                    fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd)
-                    if fdct then
-                        local data, eno
-
-                        data, re, eno = eio.read(fdct._p.rfd, 4096)
-                        if not data then
-                            if eno ~= errno.def2errnum("EINTR") then
-                                return false, re
-                            end
-                            e2lib.logf(4, "poll loop read: fd=%d -> EINTR",
-                                fdct._p.rfd)
-                        elseif data ~= "" then
-                            if fdct.linebuffer then
-                                fd_linebuffer(fdct, data)
-                            else
-                                fdct.callfn(data)
-                            end
-                        end
-                    end
-                elseif ptab.POLLOUT then
-                    return false, err.new("poll unexpectedly returned POLLOUT")
-                else
-                    -- Nothing to read, nothing to write, file descriptor
-                    -- was closed.
-                    --
-                    -- Flush remaining buffers if linebuffer is enabled
-                    -- and the last fread did not end with \n.
-                    fdct = fd_find_writefunc_by_readfd(fdctv, ptab.fd)
-                    if fdct then
-                        fd_linebuffer_final(fdct)
-                    end
-                    table.remove(fdvec, ptab.fdvecpos)
-                end
-            end
-        end
-
-        return true
-    end
-
-    local function fd_parent_cleanup(fdctv)
-        local rc, re
-        for _,fdct in ipairs(fdctv) do
-            if fdct.istype == "writefunc" then
-                rc, re = eio.close(fdct._p.rfd)
-                if not rc then
-                    return false, re
-                end
-            end
+            -- carry the pid for each fd, to differentiate in callcmd_poll when
+            -- setting up pipes and to ease diagnostics.
+            fdct._p.pid = pid
         end
 
         return true
@@ -1799,7 +1871,7 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty)
             return false, err.new("signal received, shutting down e2factory")
         end
 
-        rc, re = fd_parent_setup(fdctv)
+        rc, re = fd_parent_setup(fdctv, argv)
         if not rc then
             return false, re
         end
@@ -1864,7 +1936,7 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty)
         -- needs to be returned to the caller from here on out.
 
         children_insert(pid, fdm)
-        rc, re = fd_parent_after_fork(fdctv)
+        rc, re = fd_parent_after_fork(fdctv, pid)
         if not rc then
             return false, re, pid
         end
@@ -1877,54 +1949,34 @@ function e2lib.callcmd(argv, fdctv, workdir, envdict, nowait, pty)
         return true, nil, pid
     end
 
-    -- If an error occurs and we have a valid PID, retrieve the exit
-    -- status of the child and add it to error message.
-    local function extract_err_status(pid, e)
-        if pid then
-            e2lib.logf(4, "waiting for exit status of pid %d", pid)
-
-            local rc, re, sig = e2lib.wait_pid_delete(pid, true)
-            if not rc then
-                re:cat(e)
-                return
-            end
-
-            if re == 0 then
-                e2lib.logf(4, "pid %d still running, sending SIGINT", pid)
-                children_send_sigint(pid)
-                rc, re, sig = e2lib.wait_pid_delete(pid, false)
-                if not rc then
-                    re:cat(e)
-                    return
-                end
-            end
+    local rc, re, pid, sig
 
-            err.new("process %d returned code %d, signal %d",
-                re, rc, sig or 0):cat(e) -- tricky!
-        end
-    end
+    assert(nowait == nil or nowait == false
+        or nowait == true or nowait == 'nopoll')
 
-    local rc, re, pid, sig
+    e2lib.logf(4, 'e2lib.callcmd argv=%q', table.concat(argv, '", "'))
 
     -- fork dance, enter critical section, block all signals
     le2lib.signal_block()
     rc, re, pid = do_fork(argv, fdctv, workdir, envdict, nowait, pty)
     le2lib.signal_unblock()
     if not rc then
-        extract_err_status(pid, re)
+        _retrieve_status_kill(pid, re)
         return false, re
     end
 
+    if nowait == 'nopoll' then
+        return pid
+    end
+
     -- poll loop
-    rc, re = fd_parent_poll(fdctv)
+    rc, re = e2lib.callcmd_poll(fdctv)
     if not rc then
-        extract_err_status(pid, re)
         return false, re
     end
 
-    rc, re = fd_parent_cleanup(fdctv)
+    rc, re = e2lib.callcmd_cleanup(fdctv)
     if not rc then
-        extract_err_status(pid, re)
         return false, re
     end