From: Ralph Ronnquist Date: Fri, 10 Jun 2022 07:27:17 +0000 (+1000) Subject: added X-Git-Url: https://git.rrq.au/?a=commitdiff_plain;h=d0898f3ed152897f45f7f29a15ff1046531195c0;p=rrq%2Fnewlisp-ftw.git added --- diff --git a/coproc.lsp b/coproc.lsp new file mode 100644 index 0000000..016b1b0 --- /dev/null +++ b/coproc.lsp @@ -0,0 +1,132 @@ +;; Utility module for coprocesses in newlisp +;; +;; This is a convenience implementation for running a sub process with +;; its stdio via pipes. It is somewhat similar to the standard +;; function but the utility combines the process +;; PID together with its stdin/stdout into a FOOP object for onwards +;; interactions. The command is passed as a command line to +;; /bin/sh and allows passing in environment. Further, the startup +;; includes closing the unused file descriptors. +;; +;; @syntax: (coproc cmd [ env ] ) +;; Start a coprocess and returns its representation which is a FOOP +;; object: ([coproc] PID outfd infd) +;; +;; The given command is passed to "sh -c 'cmd'" sub process +;; through a fork+execve scheme. The argument should be a +;; pointer to a null-terminated array of pointers to environment +;; NUL-terminated strings of format "NAME=VALUE". (The caller must +;; take care of providing existing pointers. +;; +;; @syntax: (:put0 coproc str ) +;; Write to the given . Returns the number of bytes +;; written. +;; +;; @syntax: (:puts coproc str ) +;; Write followed by newline to the given . Returns the +;; number of bytes written. +;; +;; @syntax: (:gets coproc [ limit [ waitms ]] ) +;; Read one line from the given or up to bytes +;; (default 100000) with up to millisceonds delay (default +;; 100) before reading. +;; +;; @syntax: (:pug coproc str [ limit [ waitms ]] ) +;; Write with :puts, then read once with :gets and return that. +;; +;; @syntax: (:pugs coproc str [ limit [ waitms ]] ) +;; Write with :puts, then read repeatedly with :gets collating +;; its returned lines into a list until :gets retuns nil. Returns the +;; list of lines. +;; +;; @syntax: (:running? coproc [ timeout ]) +;; Check if coproc is running, i.e., that its stdin is still open. + +(context 'coproc) + +(constant 'LIBC6 "/lib/x86_64-linux-gnu/libc.so.6") +(import LIBC6 "execve" "int" + "void*" ; pathname + "void*" ; argv[] + "void*" ; env[] + ) +(import LIBC6 "dup2" "int" + "int" ; oldfd + "int" ; newfd + ) +(import LIBC6 "poll" "int" + "void*" ; struct pollfd *fds + "int" ; nfds_t nfds + "int" ; timeout + ) + +(constant 'F_GETFD 1) + +; is a 64-bit char** variable, rather than a function +(import LIBC6 "environ") + +; Return the 64-bit value of environ +(define (environ@) ((unpack "Lu" (address environ)) 0)) + +; Pack a list of void* pointers into an array with a final null +; pointer added. +(define (pack-ptrs PTRS) + (pack (dup "Lu" (+ 1 (length PTRS))) (append PTRS (list 0)))) + +; Prepare the binary data setup for calling execve to execute +; via "sh -c 'exec '" and the given environment . If +; is , then the current process environment is used, otherwise +; it should be a list of name-value pairs (strings). +(define (wrapper cmd IO ENV) + (letn ((ARGS (list "/bin/sh" "-c" (string "exec " cmd))) + (ARGV (pack-ptrs (find-all nil ARGS (address $it) !=))) + (EV (if (list? ENV) (map (curry format "%s=%s") ENV))) + (ENVP (if (= true ENV) (environ@) + EV (pack-ptrs (find-all nil EV (address $it) !=)))) + ) + (map dup2 (list (IO 0 0) (IO 1 1) (IO 1 1)) '(0 1 2)) + (map close (flat IO)) + (execve "/bin/sh" ARGV (or ENVP 0)) + (exit 1))) + +; Create a coproc FOOP object which holds PID and pipe ends. +(define (coproc:coproc cmd ENV) + (let ((IO (list (pipe) (pipe)))) + (list (context) + (fork (wrapper cmd IO ENV)) + (begin (close (IO 0 0)) (close (IO 1 1)) (IO 0 1)) + (IO 1 0)))) + +; Wait up to for input from the coproc, then read one line of +; up to bytes and return. The final newline, if any, is +; chopped off. +(define (gets limit waitms) + (let ((buffer "") + (delay (if (number? waitms) (* waitms 1000) 100000)) + (maxsize (if (number? limit) limit 1000000)) + ) + (and (net-select (self 3) "r" delay) + (read (self 3) buffer maxsize "\n") + (if (empty? buffer) buffer + (!= "\n" (buffer -1)) buffer + (chop buffer)) + ))) + +; Write to the coproc. +(define (put0 str) + (write (self 2) str (length str))) + +; Write to the coproc plus a newline. +(define (puts str) + (write-line (self 2) str (length str))) + +; First :puts, then :gets +(define (pug str limit waitms) (puts str) (gets limit waitms)) + +; First :puts, then collect :gets until nil +(define (pugs str limit waitms) (puts str) (collect (gets limit waitms))) + +; Poll the stdin pipe +(define (running? (timeout 1)) + (let ((FDS (pack "Lu" (self 2)))) + (>= 0 (poll FDS 1 timeout))))