From d3f3c8a0472481d94154b6788eb99d1b549ad86b Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser hiddenalpha.ch Date: Thu, 29 Dec 2022 22:40:04 +0100 Subject: Add CollectionUtils and ByteChunkOStream --- .../xtra4j/collection/CollectionUtils.java | 27 ++++ .../xtra4j/octetstream/ByteChunkOStream.java | 136 +++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/collection/CollectionUtils.java create mode 100644 xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/ByteChunkOStream.java diff --git a/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/collection/CollectionUtils.java b/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/collection/CollectionUtils.java new file mode 100644 index 0000000..09b1ef9 --- /dev/null +++ b/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/collection/CollectionUtils.java @@ -0,0 +1,27 @@ +package ch.hiddenalpha.xtra4j.collection; + +import java.util.function.BiPredicate; + + +public class CollectionUtils { + + /** + * @param haystack + * Array to search in. + * @param needle + * Element to search for. + * @param equals + * Predicate which decides if two elements are equal. + * @param + * Type of elements we are working with. + */ + public static boolean contains( T[] haystack, T needle, BiPredicate equals ){ + for( T t : haystack ){ + if( equals.test(needle, t) ){ + return true; + } + } + return false; + } + +} diff --git a/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/ByteChunkOStream.java b/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/ByteChunkOStream.java new file mode 100644 index 0000000..85c2984 --- /dev/null +++ b/xtra4j-misc/src/main/java/ch/hiddenalpha/xtra4j/octetstream/ByteChunkOStream.java @@ -0,0 +1,136 @@ +package ch.hiddenalpha.xtra4j.octetstream; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +import static java.lang.Math.min; +import static java.lang.System.arraycopy; + + +/** Converts an octet-stream to a push-source of byte-arrays. */ +public class ByteChunkOStream extends OutputStream { + + private final int chunkSize; + private byte[] buf; + private int bufUsedBytes; // How many bytes actually are in-use. + private final ChunkHandler onChunk; + private EndHandler onEnd; + + /** + * @param chunkSize + * Hint of how large our produced chunks should be. + * @param onChunk + * Receiver of resulting chunks. + * @param onEnd + * Called as soon stream has ended. + */ + public ByteChunkOStream( int chunkSize, ChunkHandler onChunk, EndHandler onEnd ){ + assert onChunk != null : "ChunkHandler cannot be null"; + assert chunkSize >= 1 : "chunk size too small: "+ chunkSize; + this.chunkSize = chunkSize; + this.buf = new byte[chunkSize]; + this.onChunk = onChunk; + this.onEnd = onEnd; + } + + @Override + public void write( byte[] b, int off, int len ) throws IOException { + int remainingBytes = len; + while( true ){ + int appendedBytes = appendToBuffer(b, off, len); + remainingBytes -= appendedBytes; + if( remainingBytes > 0 ){ + publishBuffer(); + // Adjust pointers then loop and continue writing remainder. + off += appendedBytes; + len -= appendedBytes; + }else if( remainingBytes == 0 ){ + break; // Done :) + }else{ + assert false : "Huh?!? why is remainingBytes " + remainingBytes + "?"; + } + } + } + + @Override + public void write( int b ) throws IOException { + if( appendToBuffer(b) == 1 ){ + return; } + // Did not work. Can happen if there's no more space in buffer. + publishBuffer(); + // Try once more. + int err = appendToBuffer(b); + assert err == 1 : "Why did appendToBuffer() return " + err + "?"; + } + + @Override + public void flush() throws IOException { + publishBuffer(); + } + + @Override + public void close() throws IOException { + flush(); + buf = null; // Think for GC + EndHandler tmp = onEnd; + onEnd = null; // Reduce probability of calling it multiple times + if( tmp != null ){ + tmp.run(); } + } + + /** + * Appends as many bytes as possible to the internal buffer. + * @return + * Number of bytes effectively copied. + */ + private int appendToBuffer( byte[] b, int off, int len ) { + int availSpace = buf.length - bufUsedBytes; + int bytesToCopy = min(availSpace, len); + arraycopy(b, off, buf, bufUsedBytes, bytesToCopy); + bufUsedBytes += bytesToCopy; + return bytesToCopy; + } + + /** Same as {@link #appendToBuffer(byte[], int, int)} but for appending a + * single byte */ + private int appendToBuffer( int b ) { + if( bufUsedBytes < buf.length ){ + buf[bufUsedBytes++] = (byte)b; + return 1; + }else{ + return 0; + } + } + + private void publishBuffer() throws IOException { + if( bufUsedBytes == 0 ){ + return; /* Nothing to do */ } + byte[] bufToPublish; + if( bufUsedBytes == buf.length ){ + // No need to copy stuff around. + bufToPublish = buf; + buf = new byte[chunkSize]; + }else{ + // Buffer is not completely full. So we have to make a copy so + // buf.length does report the correct value to callee. That implies + // that we can continue using our existing buffer for ourself. + bufToPublish = Arrays.copyOfRange(this.buf, 0, bufUsedBytes); + } + bufUsedBytes = 0; // Our internal buffer is now empty. + onChunk.accept(bufToPublish); + } + + + /** Inspired by {@link java.util.function.Consumer} */ + public static interface ChunkHandler { + void accept( byte[] bytes ) throws IOException; + } + + + /** Inspired by {@link java.lang.Runnable} */ + public static interface EndHandler { + void run() throws IOException; + } + +} -- cgit v1.1