4 * Author : Paul Marquess
5 * Date : 26th March 2000
13 #include "../Call/ppport.h"
19 #define MY_CXT_KEY "Filter::Util::Exec::_guts" XS_VERSION
31 #define fdebug (MY_CXT.x_fdebug)
33 #define write_started (MY_CXT.x_write_started)
34 #define pipe_pid (MY_CXT.x_pipe_pid)
37 #ifdef PERL_FILTER_EXISTS
38 # define CORE_FILTER_SCRIPT PL_parser->rsfp
40 # define CORE_FILTER_SCRIPT PL_rsfp
44 #define PIPE_IN(sv) IoLINES(sv)
45 #define PIPE_OUT(sv) IoPAGE(sv)
46 #define PIPE_PID(sv) IoLINES_LEFT(sv)
48 #define BUF_SV(sv) IoTOP_GV(sv)
49 #define BUF_START(sv) SvPVX((SV*) BUF_SV(sv))
50 #define BUF_SIZE(sv) SvCUR((SV*) BUF_SV(sv))
51 #define BUF_NEXT(sv) IoFMT_NAME(sv)
52 #define BUF_END(sv) (BUF_START(sv) + BUF_SIZE(sv))
53 #define BUF_OFFSET(sv) IoPAGE_LEN(sv)
55 #define SET_LEN(sv,len) \
56 do { SvPVX(sv)[len] = '\0'; SvCUR_set(sv, len); } while (0)
67 struct perl_thread * parent;
70 PerlInterpreter * parent;
75 pipe_write(void *args)
77 thrarg *targ = (thrarg *)args;
80 int pipe_in = PIPE_IN(sv) ;
81 int pipe_out = PIPE_OUT(sv) ;
85 /* use the parent's perl thread context */
86 SET_THR(targ->parent);
89 PERL_SET_THX(targ->parent);
97 /* get some raw data to stuff down the pipe */
98 /* But only when BUF_SV is empty */
99 if (!rawread_eof && BUF_NEXT(sv) >= BUF_END(sv)) {
101 SvCUR_set((SV*)BUF_SV(sv), 0) ;
102 if ((len = FILTER_READ(idx+1, (SV*) BUF_SV(sv), 0)) > 0) {
103 BUF_NEXT(sv) = BUF_START(sv);
105 warn ("*pipe_write(%d) Filt Rd returned %d %d [%*s]\n",
106 idx, len, BUF_SIZE(sv), BUF_SIZE(sv), BUF_START(sv)) ;
109 /* eof, close write end of pipe after writing to it */
114 /* write down the pipe */
115 if ((w = BUF_END(sv) - BUF_NEXT(sv)) > 0) {
117 if ((w = write(pipe_out, BUF_NEXT(sv), w)) > 0) {
120 warn ("*pipe_write(%d) wrote %d bytes to pipe\n", idx, w) ;
124 warn ("*pipe_write(%d) closing pipe_out errno = %d %s\n",
125 idx, errno, Strerror(errno)) ;
127 CloseHandle((HANDLE)pipe_pid);
132 else if (rawread_eof) {
134 warn ("*pipe_write(%d) closing pipe_out errno = %d %s\n",
135 idx, errno, Strerror(errno)) ;
137 CloseHandle((HANDLE)pipe_pid);
146 pipe_read(SV *sv, int idx, int maxlen)
149 int pipe_in = PIPE_IN(sv) ;
150 int pipe_out = PIPE_OUT(sv) ;
157 warn ("*pipe_read(sv=%d, SvCUR(sv)=%d, idx=%d, maxlen=%d\n",
158 sv, SvCUR(sv), idx, maxlen) ;
163 /* just make sure the SV is big enough */
164 SvGROW(sv, SvCUR(sv) + maxlen) ;
167 BUF_NEXT(sv) = BUF_START(sv);
169 if (!write_started) {
170 thrarg *targ = (thrarg*)malloc(sizeof(thrarg));
171 targ->sv = sv; targ->idx = idx;
178 /* thread handle is closed when pipe_write() returns */
179 _beginthread(pipe_write,0,(void *)targ);
183 /* try to get data from filter, if any */
186 if ((r = read(pipe_in, SvPVX(sv) + len, maxlen)) > 0)
189 warn ("*pipe_read(%d) from pipe returned %d [%*s]\n",
190 idx, r, r, SvPVX(sv) + len) ;
191 SvCUR_set(sv, r + len) ;
196 warn ("*pipe_read(%d) returned %d, errno = %d %s\n",
197 idx, r, errno, Strerror(errno)) ;
199 /* close the read pipe on error/eof */
201 warn("*pipe_read(%d) -- EOF <#########\n", idx) ;
210 pipe_read(SV *sv, int idx, int maxlen)
213 int pipe_in = PIPE_IN(sv) ;
214 int pipe_out = PIPE_OUT(sv) ;
215 int pipe_pid = PIPE_PID(sv) ;
222 warn ("*pipe_read(sv=%d, SvCUR(sv)=%d, idx=%d, maxlen=%d\n",
223 sv, SvCUR(sv), idx, maxlen) ;
228 /* just make sure the SV is big enough */
229 SvGROW(sv, SvCUR(sv) + maxlen) ;
234 BUF_NEXT(sv) = BUF_START(sv);
237 /* try to get data from filter, if any */
240 if ((r = read(pipe_in, SvPVX(sv) + len, maxlen)) > 0)
243 warn ("*pipe_read(%d) from pipe returned %d [%*s]\n",
244 idx, r, r, SvPVX(sv) + len) ;
245 SvCUR_set(sv, r + len) ;
250 warn ("*pipe_read(%d) returned %d, errno = %d %s\n",
251 idx, r, errno, Strerror(errno)) ;
253 if (errno != VAL_EAGAIN)
255 /* close the read pipe on error/eof */
257 warn("*pipe_read(%d) -- EOF <#########\n", idx) ;
260 waitpid(pipe_pid, NULL, 0) ;
268 /* get some raw data to stuff down the pipe */
269 /* But only when BUF_SV is empty */
270 if (BUF_NEXT(sv) >= BUF_END(sv))
273 SvCUR_set((SV*)BUF_SV(sv), 0) ;
274 if ((len = FILTER_READ(idx+1, (SV*) BUF_SV(sv), 0)) > 0) {
275 BUF_NEXT(sv) = BUF_START(sv);
277 warn ("*pipe_write(%d) Filt Rd returned %d %d [%*s]\n",
278 idx, len, BUF_SIZE(sv), BUF_SIZE(sv), BUF_START(sv)) ;
281 /* eof, close write end of pipe */
284 warn ("*pipe_read(%d) closing pipe_out errno = %d %s\n",
290 /* write down the pipe */
291 if ((w = BUF_END(sv) - BUF_NEXT(sv)) > 0)
294 if ((w = write(pipe_out, BUF_NEXT(sv), w)) > 0) {
297 warn ("*pipe_read(%d) wrote %d bytes to pipe\n", idx, w) ;
299 else if (errno != VAL_EAGAIN) {
301 warn ("*pipe_read(%d) closing pipe_out errno = %d %s\n",
302 idx, errno, Strerror(errno)) ;
303 /* close(pipe_out) ; */
306 else { /* pipe is full, sleep for a while, then continue */
308 warn ("*pipe_read(%d) - sleeping\n", idx ) ;
320 int mode = fcntl(f, F_GETFL);
323 croak("fcntl(f, F_GETFL) failed, RETVAL = %d, errno = %d",
326 if (!(mode & VAL_O_NONBLOCK))
327 RETVAL = fcntl(f, F_SETFL, mode | VAL_O_NONBLOCK);
330 croak("cannot create a non-blocking pipe, RETVAL = %d, errno = %d",
341 spawnCommand(PerlIO *fil, char *command, char *parameters[], int *p0, int *p1)
346 #if defined(PERL_OBJECT)
347 # define win32_pipe(p,n,f) _pipe(p,n,f)
352 int oldstdout, oldstdin;
354 /* create the pipes */
355 if (win32_pipe(p,512,O_TEXT|O_NOINHERIT) == -1
356 || win32_pipe(c,512,O_BINARY|O_NOINHERIT) == -1) {
358 croak("Can't get pipe for %s", command);
361 /* duplicate stdout and stdin */
362 oldstdout = dup(fileno(stdout));
363 if (oldstdout == -1) {
365 croak("Can't dup stdout for %s", command);
367 oldstdin = dup(fileno(stdin));
368 if (oldstdin == -1) {
370 croak("Can't dup stdin for %s", command);
373 /* duplicate inheritable ends as std handles for the child */
374 if (dup2(p[WRITER], fileno(stdout))) {
376 croak("Can't attach pipe to stdout for %s", command);
378 if (dup2(c[READER], fileno(stdin))) {
380 croak("Can't attach pipe to stdin for %s", command);
383 /* close original inheritable ends in parent */
387 /* spawn child process (which inherits the redirected std handles) */
388 pipe_pid = spawnvp(P_NOWAIT, command, parameters);
389 if (pipe_pid == -1) {
391 croak("Can't spawn %s", command);
394 /* restore std handles */
395 if (dup2(oldstdout, fileno(stdout))) {
397 croak("Can't restore stdout for %s", command);
399 if (dup2(oldstdin, fileno(stdin))) {
401 croak("Can't restore stdin for %s", command);
404 /* close saved handles */
417 /* Check that the file is seekable */
418 /* if (lseek(fileno(fil), ftell(fil), 0) == -1) { */
419 /* croak("lseek failed: %s", Strerror(errno)) ; */
422 if (pipe(p) < 0 || pipe(c)) {
424 croak("Can't get pipe for %s", command);
427 /* make sure that the child doesn't get anything extra */
431 while ((pipepid = fork()) < 0) {
432 if (errno != EAGAIN) {
438 croak("Can't fork for %s", command);
448 if (c[READER] != 0) {
452 if (p[WRITER] != 1) {
458 execvp(command, parameters) ;
459 croak("execvp failed for command '%s': %s", command, Strerror(errno)) ;
470 /* make the pipe non-blocking */
471 make_nonblock(p[READER]) ;
472 make_nonblock(c[WRITER]) ;
483 filter_exec(pTHX_ int idx, SV *buf_sv, int maxlen)
487 SV *buffer = FILTER_DATA(idx);
488 char * out_ptr = SvPVX(buffer) ;
494 warn ("filter_sh(idx=%d, SvCUR(buf_sv)=%d, maxlen=%d\n",
495 idx, SvCUR(buf_sv), maxlen) ;
499 /* If there was a partial line/block left from last time
502 if (n = SvCUR(buffer)) {
503 out_ptr = SvPVX(buffer) + BUF_OFFSET(buffer) ;
507 warn("filter_sh(%d) - wants a block\n", idx) ;
508 sv_catpvn(buf_sv, out_ptr, maxlen > n ? n : maxlen );
510 BUF_OFFSET(buffer) = 0 ;
514 BUF_OFFSET(buffer) += maxlen ;
515 SvCUR_set(buffer, n - maxlen) ;
517 return SvCUR(buf_sv);
522 warn("filter_sh(%d) - wants a line\n", idx) ;
523 if (p = ninstr(out_ptr, out_ptr + n, nl, nl + 1)) {
524 sv_catpvn(buf_sv, out_ptr, p - out_ptr + 1);
525 n = n - (p - out_ptr + 1);
526 BUF_OFFSET(buffer) += (p - out_ptr + 1);
527 SvCUR_set(buffer, n) ;
529 warn("recycle(%d) - leaving %d [%s], returning %d %d [%s]",
531 SvPVX(buffer), p - out_ptr + 1,
532 SvCUR(buf_sv), SvPVX(buf_sv)) ;
534 return SvCUR(buf_sv);
536 else /* partial buffer didn't have any newlines, so copy it all */
537 sv_catpvn(buf_sv, out_ptr, n) ;
543 /* the buffer has been consumed, so reset the length */
545 BUF_OFFSET(buffer) = 0 ;
547 /* read from the sub-process */
548 if ( (n=pipe_read(buffer, idx, maxlen)) <= 0) {
551 warn ("filter_sh(%d) - pipe_read returned %d , returning %d\n",
552 idx, n, (SvCUR(buf_sv)>0) ? SvCUR(buf_sv) : n);
554 SvCUR_set(buffer, 0);
555 BUF_NEXT(buffer) = Nullch; /* or perl will try to free() it */
556 /* filter_del(filter_sh); */
558 /* If error, return the code */
562 /* return what we have so far else signal eof */
563 return (SvCUR(buf_sv)>0) ? SvCUR(buf_sv) : n;
567 warn(" filter_sh(%d): pipe_read returned %d %d: '%s'",
568 idx, n, SvCUR(buffer), SvPV(buffer,n_a));
575 MODULE = Filter::Util::Exec PACKAGE = Filter::Util::Exec
584 /* temporary hack to control debugging in toke.c */
585 filter_add(NULL, (fdebug) ? (SV*)"1" : (SV*)"0");
590 filter_add(module, command, ...)
591 SV * module = NO_INIT
592 char ** command = (char**) safemalloc(items * sizeof(char*)) ;
597 int pipe_in, pipe_out ;
599 /* SV * sv = newSVpv("", 0) ; */
604 warn("Filter::exec::import\n") ;
605 for (i = 1 ; i < items ; ++i)
607 command[i-1] = SvPV(ST(i), n_a) ;
609 warn(" %s\n", command[i-1]) ;
611 command[i-1] = NULL ;
612 filter_add(filter_exec, sv);
613 pid = spawnCommand(CORE_FILTER_SCRIPT, command[0], command, &pipe_in, &pipe_out) ;
614 safefree((char*)command) ;
617 PIPE_IN(sv) = pipe_in ;
618 PIPE_OUT(sv) = pipe_out ;
619 /* BUF_SV(sv) = newSVpv("", 0) ; */
620 BUF_SV(sv) = (GV*) newSV(1) ;
621 (void)SvPOK_only(BUF_SV(sv)) ;
622 BUF_NEXT(sv) = NULL ;