editorial
[rrq/newlisp/coproc.git] / coproc.lsp
1 ;; Utility module for coprocesses in newlisp
2 ;;
3 ;; This is a convenience implementation for running a sub process with
4 ;; its stdio via pipes. It is somewhat similar to the standard
5 ;; <process> function but the <coproc> utility combines the process
6 ;; PID together with its stdin/stdout into a FOOP object for onwards
7 ;; interactions. The <coproc> command is passed as a command line to
8 ;; /bin/sh and allows passing in environment. Further, the startup
9 ;; includes closing the unused file descriptors.
10 ;;
11 ;; @syntax: (coproc cmd [ env ] )
12 ;; Start a coprocess and returns its representation which is a FOOP
13 ;; object: ([coproc] PID outfd infd)
14 ;;
15 ;; The given <cmd> command is passed to "sh -c 'cmd'" sub process
16 ;; through a fork+execve scheme. The <env> argument should be a
17 ;; pointer to a null-terminated array of pointers to environment
18 ;; NUL-terminated strings of format "NAME=VALUE". (The caller must
19 ;; take care of providing existing pointers.
20 ;;
21 ;; @syntax: (:put0 coproc str )
22 ;; Write <str> to the given <coproc>. Returns the number of bytes
23 ;; written.
24 ;;
25 ;; @syntax: (:puts coproc str )
26 ;; Write <str> followed by newline to the given <coproc>. Returns the
27 ;; number of bytes written.
28 ;;
29 ;; @syntax: (:gets coproc [ limit [ waitms ]] )
30 ;; Read one line from the given <coproc> or up to <limit> bytes
31 ;; (default 100000) with up to <waitms> millisceonds delay (default
32 ;; 100) before reading. 
33 ;;
34 ;; @syntax: (:pug coproc str [ limit [ waitms ]] )
35 ;; Write <str> with :puts, then read once with :gets and return that.
36 ;;
37 ;; @syntax: (:pugs coproc str [ limit [ waitms ]] )
38 ;; Write <str> with :puts, then read repeatedly with :gets collating
39 ;; its returned lines into a list until :gets retuns nil. Returns the
40 ;; list of lines.
41 ;;
42 ;; @syntax: (:running? coproc [ timeout ])
43 ;; Check if coproc is running, i.e., that its stdin is still open.
44
45 (context 'coproc)
46
47 (constant 'LIBC6 "/lib/x86_64-linux-gnu/libc.so.6")
48 (import LIBC6 "execve" "int"
49         "void*" ; pathname
50         "void*" ; argv[]
51         "void*" ; env[]
52         )
53 (import LIBC6 "dup2" "int"
54         "int" ; oldfd
55         "int" ; newfd
56         )
57 (import LIBC6 "poll" "int"
58         "void*" ; struct pollfd *fds
59         "int" ; nfds_t nfds
60         "int" ; timeout
61         )
62 (import coproc:LIBC6 "on_exit" "int"
63         "void*" ; void (*function)(int , void *)
64         "void*" ; arg
65         )
66
67 (constant 'F_GETFD 1)
68
69 ; <environ> is a 64-bit char** variable, rather than a function
70 (import LIBC6 "environ")
71
72 ; Return the 64-bit value of environ
73 (define (environ@) ((unpack "Lu" (address environ)) 0))
74
75 ; Pack a list of void* pointers into an array with a final null
76 ; pointer added.
77 (define (pack-ptrs PTRS)
78   (pack (dup "Lu" (+ 1 (length PTRS))) (append PTRS (list 0))))
79
80 ; Prepare the binary data setup for calling execve to execute <cmd>
81 ; via "sh -c 'exec <cmd>'" and the given environment <ENV>. If <ENV>
82 ; is <true>, then the current process environment is used, otherwise
83 ; it should be a list of name-value pairs (strings).
84 (define (wrapper cmd IO ENV)
85   (letn ((ARGS (list "/bin/sh" "-c" (string "exec " cmd)))
86          (ARGV (pack-ptrs (find-all nil ARGS (address $it) !=)))
87          (EV (if (list? ENV) (map (curry format "%s=%s") ENV)))
88          (ENVP (if (= true ENV) (environ@)
89                  EV (pack-ptrs (find-all nil EV (address $it) !=))))
90          )
91     :;(map dup2 (list (IO 0 0) (IO 1 1) (IO 1 1)) '(0 1 2))
92     (map dup2 (list (IO 0 0) (IO 1 1)) '(0 1 2)) ; share stderr
93     (map close (flat IO))
94     (execve "/bin/sh" ARGV (or ENVP 0))
95     (exit 1)))
96
97 (setf SUBS '())
98
99 ; Create a coproc FOOP object which holds PID and pipe ends.
100 (define (coproc:coproc cmd ENV)
101   (let ((IO (list (pipe) (pipe))))
102     (last
103      (push (list (context)
104                  (fork (wrapper cmd IO ENV))
105                  (begin (close (IO 0 0)) (close (IO 1 1)) (IO 0 1))
106                  (IO 1 0))
107            SUBS -1))))
108
109 (define (kill-all-coproc x y)
110   (dolist (S SUBS)
111     (when (:running? S)
112       (! (format "kill %d" (S 1)))
113       )))
114 (on_exit (callback 0 'kill-all-coproc "void" "int" "void*") 0)
115
116 ; Wait up to <waitms> for input from the coproc, then read one line of
117 ; up to <limit> bytes and return. The final newline, if any, is
118 ; chopped off.
119 (define (gets limit waitms)
120   (let ((buffer "")
121         (delay (if (number? waitms) (* waitms 1000) 100000))
122         (maxsize (if (number? limit) limit 1000000))
123         )
124     (and (net-select (self 3) "r" delay)
125          (read (self 3) buffer maxsize "\n")
126          (if (empty? buffer) buffer
127            (!= "\n" (buffer -1)) buffer
128            (chop buffer))
129          )))
130
131 ; Write <str> to the coproc.
132 (define (put0 str)
133   (write (self 2) str (length str)))
134
135 ; Write <str> to the coproc plus a newline.
136 (define (puts str)
137   (write-line (self 2) str (length str)))
138
139 ; First :puts, then :gets
140 (define (pug str limit waitms) (puts str) (gets limit waitms))
141
142 ; First :puts, then collect :gets until nil
143 (define (pugs str limit waitms) (puts str) (collect (gets limit waitms)))
144
145 ; Poll the stdin pipe
146 (define (running? (timeout 1))
147   (let ((FDS (pack "Lu" (self 2))))
148     (>= 0 (poll FDS 1 timeout))))