From bbbae5fe23ec51b7e11ab31b19623d190d718fcf Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser hiddenalpha.ch Date: Thu, 29 Dec 2022 22:56:08 +0100 Subject: Add CloseNotify IO streams --- .../xtra4j/octetstream/CloseNotifyInputStream.java | 52 ++++++++++++++++++++ .../octetstream/CloseNotifyOutputStream.java | 55 ++++++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/CloseNotifyInputStream.java create mode 100644 xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/CloseNotifyOutputStream.java (limited to 'xtra4j-misc') diff --git a/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/CloseNotifyInputStream.java b/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/CloseNotifyInputStream.java new file mode 100644 index 0000000..6e75d61 --- /dev/null +++ b/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/CloseNotifyInputStream.java @@ -0,0 +1,52 @@ +package ch.hiddenalpha.xtra4j.octetstream; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** Allows to get notified when stream gets closed. */ +public class CloseNotifyInputStream extends FilterInputStream { + + private final Runnable onClose; + private final AtomicBoolean isFired = new AtomicBoolean(false); + + /** + * @param onClose + * Called once, as soon stream has ended. + */ + public CloseNotifyInputStream( InputStream src, Runnable onClose ){ + super(src); + assert onClose != null : "Arg 'onClose' missing"; + this.onClose = onClose; + } + + @Override + public void close() throws IOException { + Exception byObserver = null, bySrc = null; + if( !isFired.getAndSet(true) ){ + try{ + in.close(); + }catch( IOException | RuntimeException ex ){ + bySrc = ex; + } + try{ + onClose.run(); + }catch( RuntimeException ex ){ + byObserver = ex; + } + if( byObserver != null && bySrc != null && (byObserver != bySrc) ){ + bySrc.addSuppressed(byObserver); + }else if( bySrc == null ){ + bySrc = byObserver; + } + if( bySrc instanceof IOException ){ + throw (IOException)bySrc; + }else if ( bySrc != null ){ + throw (RuntimeException)bySrc; + } + } + } + +} diff --git a/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/CloseNotifyOutputStream.java b/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/CloseNotifyOutputStream.java new file mode 100644 index 0000000..3907442 --- /dev/null +++ b/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/CloseNotifyOutputStream.java @@ -0,0 +1,55 @@ +package ch.hiddenalpha.xtra4j.octetstream; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * Gives the chance to place a hook to get notified as soon the stream gets + * closed. + */ +public class CloseNotifyOutputStream extends FilterOutputStream { + + private final Runnable onClose; + private final AtomicBoolean isFired = new AtomicBoolean(false); + + /** + * @param onClose + * Called once, as soon stream has ended. + */ + public CloseNotifyOutputStream( OutputStream out, Runnable onClose ){ + super(out); + assert onClose != null: "Arg 'onClose' missing"; + this.onClose = onClose; + } + + @Override + public void close() throws IOException { + Exception byObserver = null, bySrc = null; + if( !isFired.getAndSet(true) ){ + try{ + out.close(); + }catch( IOException | RuntimeException ex ){ + bySrc = ex; + } + try{ + onClose.run(); + }catch( RuntimeException ex ){ + byObserver = ex; + } + if( byObserver != null && bySrc != null && (byObserver != bySrc) ){ + bySrc.addSuppressed(byObserver); + }else if( bySrc == null ){ + bySrc = byObserver; + } + if( bySrc instanceof IOException ){ + throw (IOException)bySrc; + }else if ( bySrc != null ){ + throw (RuntimeException)bySrc; + } + } + } + +} -- cgit v1.1