Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulseaudio] Make the process method asynchronous #15179

Merged
merged 2 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FixedLengthAudioStream;
import org.openhab.core.audio.SizeableAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,8 +58,8 @@ public ConvertedInputStream(AudioStream innerInputStream)
throws UnsupportedAudioFormatException, UnsupportedAudioFileException, IOException {
this.audioFormat = innerInputStream.getFormat();

if (innerInputStream instanceof FixedLengthAudioStream) {
length = ((FixedLengthAudioStream) innerInputStream).length();
if (innerInputStream instanceof SizeableAudioStream sizeableAudioStream) {
length = sizeableAudioStream.length();
}

pcmNormalizedInputStream = getPCMStreamNormalized(getPCMStream(new BufferedInputStream(innerInputStream)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import java.net.Socket;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.sound.sampled.UnsupportedAudioFileException;

Expand All @@ -28,9 +29,11 @@
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FixedLengthAudioStream;
import org.openhab.core.audio.FileAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.openhab.core.audio.UnsupportedAudioStreamException;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.common.Disposable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,24 +50,29 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen

private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSink.class);

private static final HashSet<AudioFormat> SUPPORTED_FORMATS = new HashSet<>();
private static final HashSet<Class<? extends AudioStream>> SUPPORTED_STREAMS = new HashSet<>();
private AudioSinkUtils audioSinkUtils;

static {
SUPPORTED_FORMATS.add(AudioFormat.WAV);
SUPPORTED_FORMATS.add(AudioFormat.MP3);
SUPPORTED_STREAMS.add(FixedLengthAudioStream.class);
}
private static final Set<AudioFormat> SUPPORTED_FORMATS = Set.of(AudioFormat.WAV, AudioFormat.MP3);
private static final Set<Class<? extends AudioStream>> SUPPORTED_STREAMS = Set.of(AudioStream.class);
private static final AudioFormat TARGET_FORMAT = new AudioFormat(AudioFormat.CONTAINER_WAVE,
AudioFormat.CODEC_PCM_SIGNED, false, 16, 4 * 44100, 44100L, 2);

public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler,
AudioSinkUtils audioSinkUtils) {
super(pulseaudioHandler, scheduler);
this.audioSinkUtils = audioSinkUtils;
}

@Override
public void process(@Nullable AudioStream audioStream)
throws UnsupportedAudioFormatException, UnsupportedAudioStreamException {
processAndComplete(audioStream);
}

@Override
public CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) {
if (audioStream == null) {
return;
return CompletableFuture.completedFuture(null);
}
addClientCount();
try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
Expand All @@ -75,18 +83,38 @@ public void process(@Nullable AudioStream audioStream)
if (clientSocketLocal != null) {
// send raw audio to the socket and to pulse audio
Instant start = Instant.now();
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
if (normalizedPCMStream.getDuration() != -1) {
// ensure, if the sound has a duration
// that we let at least this time for the system to play
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
Instant end = Instant.now();
long millisSecondTimedToSendAudioData = Duration.between(start, end).toMillis();
if (millisSecondTimedToSendAudioData < normalizedPCMStream.getDuration()) {
long timeToSleep = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
logger.debug("Sleep time to let the system play sound : {}", timeToSleep);
Thread.sleep(timeToSleep);
CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
long timeToWait = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
logger.debug("Some time to let the system play sound : {}", timeToWait);
scheduler.schedule(() -> soundPlayed.complete(null), timeToWait, TimeUnit.MILLISECONDS);
return soundPlayed;
} else {
return CompletableFuture.completedFuture(null);
}
} else {
// We have a second method available to guess the duration, and it is during transfer
Long timeStampEnd = audioSinkUtils.transferAndAnalyzeLength(normalizedPCMStream,
clientSocketLocal.getOutputStream(), TARGET_FORMAT);
CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
if (timeStampEnd != null) {
long now = System.nanoTime();
long timeToWait = timeStampEnd - now;
if (timeToWait > 0) {
scheduler.schedule(() -> soundPlayed.complete(null), timeToWait,
TimeUnit.NANOSECONDS);
}
return soundPlayed;
} else {
return CompletableFuture.completedFuture(null);
}
}
break;
}
} catch (IOException e) {
disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
Expand All @@ -97,19 +125,34 @@ public void process(@Nullable AudioStream audioStream)
logger.warn(
"Error while trying to send audio to pulseaudio audio sink. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage());
break;
return CompletableFuture.completedFuture(null);
}
} catch (InterruptedException ie) {
logger.info("Interrupted during sink audio connection: {}", ie.getMessage());
break;
return CompletableFuture.completedFuture(null);
}
}
} catch (UnsupportedAudioFileException | IOException e) {
throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
audioStream.getFormat(), e);
} catch (UnsupportedAudioFileException | UnsupportedAudioFormatException | IOException e) {
return CompletableFuture.failedFuture(new UnsupportedAudioFormatException(
"Cannot send sound to the pulseaudio sink", audioStream.getFormat(), e));
} finally {
minusClientCount();
// if the stream is not needed anymore, then we should call back the AudioStream to let it a chance
// to auto dispose.
if (audioStream instanceof Disposable disposableAudioStream) {
try {
disposableAudioStream.dispose();
} catch (IOException e) {
String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown";
if (logger.isDebugEnabled()) {
logger.debug("Cannot dispose of stream {}", fileName, e);
} else {
logger.warn("Cannot dispose of stream {}, reason {}", fileName, e.getMessage());
}
}
}
}
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.openhab.binding.pulseaudio.internal.discovery.PulseaudioDeviceDiscoveryService;
import org.openhab.binding.pulseaudio.internal.handler.PulseaudioBridgeHandler;
import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.config.discovery.DiscoveryService;
import org.openhab.core.thing.Bridge;
Expand All @@ -39,6 +40,7 @@
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +63,13 @@ public class PulseaudioHandlerFactory extends BaseThingHandlerFactory {

private PulseAudioBindingConfiguration configuration = new PulseAudioBindingConfiguration();

private AudioSinkUtils audioSinkUtils;

@Activate
public PulseaudioHandlerFactory(@Reference AudioSinkUtils audioSinkUtils) {
this.audioSinkUtils = audioSinkUtils;
}

@Override
public boolean supportsThingType(ThingTypeUID thingTypeUID) {
return SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID);
Expand Down Expand Up @@ -119,7 +128,7 @@ protected void removeHandler(ThingHandler thingHandler) {
registerDeviceDiscoveryService(handler);
return handler;
} else if (PulseaudioHandler.SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID)) {
return new PulseaudioHandler(thing, bundleContext);
return new PulseaudioHandler(thing, bundleContext, audioSinkUtils);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioSource;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.IncreaseDecreaseType;
Expand Down Expand Up @@ -89,9 +90,12 @@ public class PulseaudioHandler extends BaseThingHandler {

private final BundleContext bundleContext;

public PulseaudioHandler(Thing thing, BundleContext bundleContext) {
private AudioSinkUtils audioSinkUtils;

public PulseaudioHandler(Thing thing, BundleContext bundleContext, AudioSinkUtils audioSinkUtils) {
super(thing);
this.bundleContext = bundleContext;
this.audioSinkUtils = audioSinkUtils;
}

@Override
Expand Down Expand Up @@ -127,7 +131,7 @@ private void audioSinkSetup() {
return;
}
final PulseaudioHandler thisHandler = this;
PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler);
PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler, audioSinkUtils);
scheduler.submit(new Runnable() {
@Override
public void run() {
Expand Down