AsyncStreamConsumer.java
package org.itsallcode.process;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
class AsyncStreamConsumer implements Runnable {
private static final Logger LOG = Logger.getLogger(AsyncStreamConsumer.class.getName());
private final String name;
private final long pid;
private final ProcessStreamConsumer consumer;
private final InputStream stream;
AsyncStreamConsumer(final String name, final long pid, final InputStream stream,
final ProcessStreamConsumer consumer) {
this.name = name;
this.pid = pid;
this.stream = stream;
this.consumer = consumer;
}
@Override
public void run() {
LOG.finest(() -> "Start reading from '%s' stream of process %d...".formatted(name, pid));
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String line = null;
while ((line = reader.readLine()) != null) {
consumer.accept(line);
}
LOG.finest(() -> "Stream '%s' of process %d finished".formatted(name, pid));
consumer.streamFinished();
} catch (final IOException exception) {
final Level logLevel = "Stream closed".equals(exception.getMessage()) ? Level.FINEST : Level.WARNING;
LOG.log(logLevel,
"Reading stream '%s' of process %d failed: %s".formatted(name, pid,
exception.getMessage()),
exception);
consumer.streamReadingFailed(exception);
}
}
}