ProcessOutputConsumer.java
package org.itsallcode.process;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Consumes stdout and stderr of a process asynchronously.
*/
final class ProcessOutputConsumer<T> {
private static final String STD_ERR_NAME = "stdErr";
private static final String STD_OUT_NAME = "stdOut";
private static final Logger LOG = Logger.getLogger(ProcessOutputConsumer.class.getName());
private final Executor executor;
private final Process process;
private final List<Runnable> consumers;
private final List<StreamCloseWaiter> streamCloseWaiter;
private final StreamCollector<T> stdOutCollector;
private final StreamCollector<T> stdErrCollector;
private ProcessOutputConsumer(final Executor executor, final Process process,
final List<Runnable> consumers, final List<StreamCloseWaiter> streamCloseWaiter,
final StreamCollector<T> stdOutCollector, final StreamCollector<T> stdErrCollector) {
this.executor = executor;
this.process = process;
this.consumers = Collections.unmodifiableList(consumers);
this.streamCloseWaiter = Collections.unmodifiableList(streamCloseWaiter);
this.stdOutCollector = stdOutCollector;
this.stdErrCollector = stdErrCollector;
}
static <T> ProcessOutputConsumer<T> create(final Executor executor, final Process process,
final Duration streamCloseTimeout, Level logLevel, final StreamCollector<T> stdOutCollector,
final StreamCollector<T> stdErrCollector) {
final long pid = process.pid();
final StreamCloseWaiter stdOutCloseWaiter = new StreamCloseWaiter(STD_OUT_NAME, pid, streamCloseTimeout);
final StreamCloseWaiter stdErrCloseWaiter = new StreamCloseWaiter(STD_ERR_NAME, pid, streamCloseTimeout);
final AsyncStreamConsumer stdOutConsumer = new AsyncStreamConsumer(STD_OUT_NAME, pid, process.getInputStream(),
new DelegatingConsumer(
List.of(stdOutCloseWaiter, stdOutCollector, new StreamLogger(pid, STD_OUT_NAME, logLevel))));
final AsyncStreamConsumer stdErrConsumer = new AsyncStreamConsumer(STD_ERR_NAME, pid, process.getErrorStream(),
new DelegatingConsumer(
List.of(stdErrCloseWaiter, stdErrCollector, new StreamLogger(pid, STD_ERR_NAME, logLevel))));
return new ProcessOutputConsumer<>(executor, process, List.of(stdOutConsumer, stdErrConsumer),
List.of(stdOutCloseWaiter, stdErrCloseWaiter), stdOutCollector, stdErrCollector);
}
void start() {
LOG.finest(() -> "Start reading stdout and stderr streams of process %d in background..."
.formatted(process.pid()));
consumers.forEach(executor::execute);
}
T getStdOut() {
return stdOutCollector.getResult();
}
T getStdErr() {
return stdErrCollector.getResult();
}
void waitForStreamsClosed() {
streamCloseWaiter.forEach(StreamCloseWaiter::waitUntilStreamClosed);
}
}