aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--run-command.c82
-rw-r--r--run-command.h22
-rw-r--r--t/helper/test-run-command.c52
-rwxr-xr-xt/t0061-run-command.sh31
4 files changed, 178 insertions, 9 deletions
diff --git a/run-command.c b/run-command.c
index ed9575bd6a..5bc6db5bb1 100644
--- a/run-command.c
+++ b/run-command.c
@@ -1652,6 +1652,44 @@ static int pp_start_one(struct parallel_processes *pp,
return 0;
}
+static void pp_buffer_stdin(struct parallel_processes *pp,
+ const struct run_process_parallel_opts *opts)
+{
+ /* Buffer stdin for each pipe. */
+ for (ssize_t i = 0; i < opts->processes; i++) {
+ struct child_process *proc = &pp->children[i].process;
+ int ret;
+
+ if (pp->children[i].state != GIT_CP_WORKING || proc->in <= 0)
+ continue;
+
+ /*
+ * child input is provided via path_to_stdin when the feed_pipe cb is
+ * missing, so we just signal an EOF.
+ */
+ if (!opts->feed_pipe) {
+ close(proc->in);
+ proc->in = 0;
+ continue;
+ }
+
+ /**
+ * Feed the pipe:
+ * ret < 0 means error
+ * ret == 0 means there is more data to be fed
+ * ret > 0 means feeding finished
+ */
+ ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data);
+ if (ret < 0)
+ die_errno("feed_pipe");
+
+ if (ret) {
+ close(proc->in);
+ proc->in = 0;
+ }
+ }
+}
+
static void pp_buffer_stderr(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts,
int output_timeout)
@@ -1722,6 +1760,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
pp->children[i].state = GIT_CP_FREE;
if (pp->pfd)
pp->pfd[i].fd = -1;
+ pp->children[i].process.in = 0;
child_process_init(&pp->children[i].process);
if (opts->ungroup) {
@@ -1756,6 +1795,32 @@ static int pp_collect_finished(struct parallel_processes *pp,
return result;
}
+static void pp_handle_child_IO(struct parallel_processes *pp,
+ const struct run_process_parallel_opts *opts,
+ int output_timeout)
+{
+ /*
+ * First push input, if any (it might no-op), to child tasks to avoid them blocking
+ * after input. This also prevents deadlocks when ungrouping below, if a child blocks
+ * while the parent also waits for them to finish.
+ */
+ pp_buffer_stdin(pp, opts);
+
+ if (opts->ungroup) {
+ for (size_t i = 0; i < opts->processes; i++) {
+ int child_ready_for_cleanup =
+ pp->children[i].state == GIT_CP_WORKING &&
+ pp->children[i].process.in == 0;
+
+ if (child_ready_for_cleanup)
+ pp->children[i].state = GIT_CP_WAIT_CLEANUP;
+ }
+ } else {
+ pp_buffer_stderr(pp, opts, output_timeout);
+ pp_output(pp);
+ }
+}
+
void run_processes_parallel(const struct run_process_parallel_opts *opts)
{
int i, code;
@@ -1775,6 +1840,13 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
"max:%"PRIuMAX,
(uintmax_t)opts->processes);
+ /*
+ * Child tasks might receive input via stdin, terminating early (or not), so
+ * ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
+ * actually writes the data to children stdin fds.
+ */
+ sigchain_push(SIGPIPE, SIG_IGN);
+
pp_init(&pp, opts, &pp_sig);
while (1) {
for (i = 0;
@@ -1792,13 +1864,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
}
if (!pp.nr_processes)
break;
- if (opts->ungroup) {
- for (size_t i = 0; i < opts->processes; i++)
- pp.children[i].state = GIT_CP_WAIT_CLEANUP;
- } else {
- pp_buffer_stderr(&pp, opts, output_timeout);
- pp_output(&pp);
- }
+ pp_handle_child_IO(&pp, opts, output_timeout);
code = pp_collect_finished(&pp, opts);
if (code) {
pp.shutdown = 1;
@@ -1809,6 +1875,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
pp_cleanup(&pp, opts);
+ sigchain_pop(SIGPIPE);
+
if (do_trace2)
trace2_region_leave(tr2_category, tr2_label, NULL);
}
diff --git a/run-command.h b/run-command.h
index 0df25e445f..e536ed7544 100644
--- a/run-command.h
+++ b/run-command.h
@@ -421,6 +421,22 @@ typedef int (*start_failure_fn)(struct strbuf *out,
void *pp_task_cb);
/**
+ * This callback is repeatedly called on every child process who requests
+ * start_command() to create a pipe by setting child_process.in < 0.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel, and
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ * The contents of 'send' will be read into the pipe and passed to the pipe.
+ *
+ * Returns < 0 for error
+ * Returns == 0 when there is more data to be fed (will be called again)
+ * Returns > 0 when finished (child closed fd or no more data to be fed)
+ */
+typedef int (*feed_pipe_fn)(int child_in,
+ void *pp_cb,
+ void *pp_task_cb);
+
+/**
* This callback is called on every child process that finished processing.
*
* See run_processes_parallel() below for a discussion of the "struct
@@ -473,6 +489,12 @@ struct run_process_parallel_opts
*/
start_failure_fn start_failure;
+ /*
+ * feed_pipe: see feed_pipe_fn() above. This can be NULL to omit any
+ * special handling.
+ */
+ feed_pipe_fn feed_pipe;
+
/**
* task_finished: See task_finished_fn() above. This can be
* NULL to omit any special handling.
diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c
index 3719f23cc2..dfdb03b3ab 100644
--- a/t/helper/test-run-command.c
+++ b/t/helper/test-run-command.c
@@ -23,19 +23,26 @@ static int number_callbacks;
static int parallel_next(struct child_process *cp,
struct strbuf *err,
void *cb,
- void **task_cb UNUSED)
+ void **task_cb)
{
struct child_process *d = cb;
if (number_callbacks >= 4)
return 0;
strvec_pushv(&cp->args, d->args.v);
+ cp->in = d->in;
+ cp->no_stdin = d->no_stdin;
if (err)
strbuf_addstr(err, "preloaded output of a child\n");
else
fprintf(stderr, "preloaded output of a child\n");
number_callbacks++;
+
+ /* test_stdin callback will use this to count remaining lines */
+ *task_cb = xmalloc(sizeof(int));
+ *(int*)(*task_cb) = 2;
+
return 1;
}
@@ -54,15 +61,48 @@ static int no_job(struct child_process *cp UNUSED,
static int task_finished(int result UNUSED,
struct strbuf *err,
void *pp_cb UNUSED,
- void *pp_task_cb UNUSED)
+ void *pp_task_cb)
{
if (err)
strbuf_addstr(err, "asking for a quick stop\n");
else
fprintf(stderr, "asking for a quick stop\n");
+ if (pp_task_cb)
+ FREE_AND_NULL(pp_task_cb);
return 1;
}
+static int task_finished_quiet(int result UNUSED,
+ struct strbuf *err UNUSED,
+ void *pp_cb UNUSED,
+ void *pp_task_cb)
+{
+ if (pp_task_cb)
+ FREE_AND_NULL(pp_task_cb);
+ return 0;
+}
+
+static int test_stdin_pipe_feed(int hook_stdin_fd, void *cb UNUSED, void *task_cb)
+{
+ int *lines_remaining = task_cb;
+
+ if (*lines_remaining) {
+ struct strbuf buf = STRBUF_INIT;
+ strbuf_addf(&buf, "sample stdin %d\n", --(*lines_remaining));
+ if (write_in_full(hook_stdin_fd, buf.buf, buf.len) < 0) {
+ if (errno == EPIPE) {
+ /* child closed stdin, nothing more to do */
+ strbuf_release(&buf);
+ return 1;
+ }
+ die_errno("write");
+ }
+ strbuf_release(&buf);
+ }
+
+ return !(*lines_remaining);
+}
+
struct testsuite {
struct string_list tests, failed;
int next;
@@ -157,6 +197,7 @@ static int testsuite(int argc, const char **argv)
struct run_process_parallel_opts opts = {
.get_next_task = next_test,
.start_failure = test_failed,
+ .feed_pipe = test_stdin_pipe_feed,
.task_finished = test_finished,
.data = &suite,
};
@@ -460,12 +501,19 @@ int cmd__run_command(int argc, const char **argv)
if (!strcmp(argv[1], "run-command-parallel")) {
opts.get_next_task = parallel_next;
+ opts.task_finished = task_finished_quiet;
} else if (!strcmp(argv[1], "run-command-abort")) {
opts.get_next_task = parallel_next;
opts.task_finished = task_finished;
} else if (!strcmp(argv[1], "run-command-no-jobs")) {
opts.get_next_task = no_job;
opts.task_finished = task_finished;
+ } else if (!strcmp(argv[1], "run-command-stdin")) {
+ proc.in = -1;
+ proc.no_stdin = 0;
+ opts.get_next_task = parallel_next;
+ opts.task_finished = task_finished_quiet;
+ opts.feed_pipe = test_stdin_pipe_feed;
} else {
ret = 1;
fprintf(stderr, "check usage\n");
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 76d4936a87..2f77fde0d9 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -164,6 +164,37 @@ test_expect_success 'run_command runs ungrouped in parallel with more tasks than
test_line_count = 4 err
'
+test_expect_success 'run_command listens to stdin' '
+ cat >expect <<-\EOF &&
+ preloaded output of a child
+ listening for stdin:
+ sample stdin 1
+ sample stdin 0
+ preloaded output of a child
+ listening for stdin:
+ sample stdin 1
+ sample stdin 0
+ preloaded output of a child
+ listening for stdin:
+ sample stdin 1
+ sample stdin 0
+ preloaded output of a child
+ listening for stdin:
+ sample stdin 1
+ sample stdin 0
+ EOF
+
+ write_script stdin-script <<-\EOF &&
+ echo "listening for stdin:"
+ while read line
+ do
+ echo "$line"
+ done
+ EOF
+ test-tool run-command run-command-stdin 2 ./stdin-script 2>actual &&
+ test_cmp expect actual
+'
+
cat >expect <<-EOF
preloaded output of a child
asking for a quick stop