From 8c843f6856550040f06fbc62f38280205d5bda34 Mon Sep 17 00:00:00 2001 From: piotr-yuxuan Date: Wed, 22 Dec 2021 18:53:01 +0000 Subject: [PATCH 1/2] Deprecate single-segment top-level namespace --- project.clj | 4 +- src/byte_streams.clj | 4 +- src/byte_streams/ByteBufferInputStream.java | 4 + src/byte_streams/InputStream.java | 4 + src/byte_streams/Utils.java | 4 + src/byte_streams/char_sequence.clj | 4 +- src/byte_streams/graph.clj | 4 +- src/byte_streams/protocols.clj | 4 +- src/byte_streams/pushback_stream.clj | 4 +- src/byte_streams/utils.clj | 4 +- src/clj_commons/byte_streams.clj | 1019 +++++++++++++++++ .../byte_streams/ByteBufferInputStream.java | 57 + src/clj_commons/byte_streams/InputStream.java | 52 + src/clj_commons/byte_streams/Utils.java | 7 + .../byte_streams/char_sequence.clj | 122 ++ src/clj_commons/byte_streams/graph.clj | 312 +++++ src/clj_commons/byte_streams/protocols.clj | 34 + .../byte_streams/pushback_stream.clj | 306 +++++ src/clj_commons/byte_streams/utils.clj | 30 + .../byte_streams_reload_test.clj | 2 +- .../byte_streams_simple_check.clj | 4 +- test/{ => clj_commons}/byte_streams_test.clj | 6 +- test/pushback_stream_test.clj | 2 +- version.edn | 2 +- 24 files changed, 1979 insertions(+), 16 deletions(-) create mode 100644 src/clj_commons/byte_streams.clj create mode 100644 src/clj_commons/byte_streams/ByteBufferInputStream.java create mode 100644 src/clj_commons/byte_streams/InputStream.java create mode 100644 src/clj_commons/byte_streams/Utils.java create mode 100644 src/clj_commons/byte_streams/char_sequence.clj create mode 100644 src/clj_commons/byte_streams/graph.clj create mode 100644 src/clj_commons/byte_streams/protocols.clj create mode 100644 src/clj_commons/byte_streams/pushback_stream.clj create mode 100644 src/clj_commons/byte_streams/utils.clj rename test/{ => clj_commons}/byte_streams_reload_test.clj (76%) rename test/{ => clj_commons}/byte_streams_simple_check.clj (84%) rename test/{ => clj_commons}/byte_streams_test.clj (95%) diff --git a/project.clj b/project.clj index 2647e3f..ae0ed3c 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject org.clj-commons/byte-streams (or (System/getenv "PROJECT_VERSION") "0.2.10") +(defproject org.clj-commons/byte-streams (or (System/getenv "PROJECT_VERSION") "0.2.11") :description "A simple way to handle the menagerie of Java byte representations." :license {:name "MIT License" :url "http://opensource.org/licenses/MIT"} @@ -27,7 +27,7 @@ :cljfmt {:indents {#".*" [[:inner 0]]}} :codox {:source-uri "https://github.com/clj-commons/byte-streams/blob/master/{filepath}#L{line}" :metadata {:doc/format :markdown} - :namespaces [byte-streams]} + :namespaces [clj-commons.byte-streams]} :global-vars {*warn-on-reflection* true} :java-source-paths ["src"] :jvm-opts ^:replace ["-server" "-Xmx4g"]) diff --git a/src/byte_streams.clj b/src/byte_streams.clj index 1894415..2010ca4 100644 --- a/src/byte_streams.clj +++ b/src/byte_streams.clj @@ -1,4 +1,6 @@ -(ns byte-streams +(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams + :deprecated true} + byte-streams (:refer-clojure :exclude [byte-array vector-of]) (:require [manifold diff --git a/src/byte_streams/ByteBufferInputStream.java b/src/byte_streams/ByteBufferInputStream.java index 959c570..349be4f 100644 --- a/src/byte_streams/ByteBufferInputStream.java +++ b/src/byte_streams/ByteBufferInputStream.java @@ -4,6 +4,10 @@ import java.io.IOException; import java.nio.ByteBuffer; +/** + * Deprecated, use clj_commons.byte_streams.ByteBufferInputStream. + */ +@Deprecated() public class ByteBufferInputStream extends InputStream { private ByteBuffer _buf; diff --git a/src/byte_streams/InputStream.java b/src/byte_streams/InputStream.java index 121e6f9..0640ba0 100644 --- a/src/byte_streams/InputStream.java +++ b/src/byte_streams/InputStream.java @@ -2,6 +2,10 @@ import java.io.IOException; +/** + * Deprecated, use clj_commons.byte_streams.InputStream. + */ +@Deprecated() public class InputStream extends java.io.InputStream { public interface Streamable { diff --git a/src/byte_streams/Utils.java b/src/byte_streams/Utils.java index 87510ab..db60499 100644 --- a/src/byte_streams/Utils.java +++ b/src/byte_streams/Utils.java @@ -1,5 +1,9 @@ package byte_streams; +/** + * Deprecated, use clj_commons.byte_streams.Utils. + */ +@Deprecated() public class Utils { public static byte[] byteArray(int length) { return new byte[length]; diff --git a/src/byte_streams/char_sequence.clj b/src/byte_streams/char_sequence.clj index 50d6f56..eb3a019 100644 --- a/src/byte_streams/char_sequence.clj +++ b/src/byte_streams/char_sequence.clj @@ -1,4 +1,6 @@ -(ns byte-streams.char-sequence +(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.char-sequence + :deprecated true} + byte-streams.char-sequence (:refer-clojure :exclude [flush]) (:import [java.util.concurrent.locks diff --git a/src/byte_streams/graph.clj b/src/byte_streams/graph.clj index f197188..db3f4f6 100644 --- a/src/byte_streams/graph.clj +++ b/src/byte_streams/graph.clj @@ -1,4 +1,6 @@ -(ns byte-streams.graph +(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.graph + :deprecated true} + byte-streams.graph (:refer-clojure :exclude [type]) (:require [manifold.stream :as s] diff --git a/src/byte_streams/protocols.clj b/src/byte_streams/protocols.clj index 6f1ceb0..3911855 100644 --- a/src/byte_streams/protocols.clj +++ b/src/byte_streams/protocols.clj @@ -1,4 +1,6 @@ -(ns byte-streams.protocols +(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.protocols + :deprecated true} + byte-streams.protocols (:require [byte-streams.utils :refer [defprotocol+]]) (:import diff --git a/src/byte_streams/pushback_stream.clj b/src/byte_streams/pushback_stream.clj index 989b0b6..5738386 100644 --- a/src/byte_streams/pushback_stream.clj +++ b/src/byte_streams/pushback_stream.clj @@ -1,4 +1,6 @@ -(ns byte-streams.pushback-stream +(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.byte-streams.pushback-stream + :deprecated true} + byte-streams.pushback-stream (:refer-clojure :exclude [take]) (:require [primitive-math :as p] diff --git a/src/byte_streams/utils.clj b/src/byte_streams/utils.clj index 40645fa..eefac75 100644 --- a/src/byte_streams/utils.clj +++ b/src/byte_streams/utils.clj @@ -1,4 +1,6 @@ -(ns byte-streams.utils) +(ns ^{;; single segment namespace is deprecated, use clj-commons.byte-streams.utils + :deprecated true} + byte-streams.utils) (defmacro defprotocol+ [name & body] (when-not (resolve name) diff --git a/src/clj_commons/byte_streams.clj b/src/clj_commons/byte_streams.clj new file mode 100644 index 0000000..7ee52b6 --- /dev/null +++ b/src/clj_commons/byte_streams.clj @@ -0,0 +1,1019 @@ +(ns clj-commons.byte-streams + (:refer-clojure :exclude [byte-array vector-of]) + (:require + [manifold + [stream :as s] + [deferred :as d]] + [clj-commons.byte-streams + [graph :as g] + [protocols :as proto] + [pushback-stream :as ps] + [char-sequence :as cs]] + [clojure.java.io :as io] + [primitive-math :as p]) + (:import + [clj_commons.byte_streams + Utils + ByteBufferInputStream] + [clj_commons.byte_streams.graph + Type] + [java.nio + ByteBuffer + DirectByteBuffer] + [java.lang.reflect + Array] + [java.util.concurrent.atomic + AtomicBoolean] + [java.io + File + FileOutputStream + FileInputStream + ByteArrayInputStream + ByteArrayOutputStream + PipedOutputStream + PipedInputStream + DataInputStream + InputStream + OutputStream + IOException + RandomAccessFile + Reader + InputStreamReader + BufferedReader] + [java.nio.channels + ReadableByteChannel + WritableByteChannel + FileChannel + FileChannel$MapMode + Channels + Pipe] + [java.nio.channels.spi + AbstractSelectableChannel])) + +;;; + +(defonce conversions (atom (g/conversion-graph))) +(defonce inverse-conversions (atom (g/conversion-graph))) +(defonce src->dst->transfer (atom nil)) + +(def ^:private ^:const byte-array-type (class (Utils/byteArray 0))) + +(defn seq-of [x] + (g/type 'seq (if (identical? bytes x) byte-array-type x))) + +(defn stream-of [x] + (g/type 'stream (if (identical? bytes x) byte-array-type x))) + +(defn vector-of [x] + (g/type 'vector (if (identical? bytes x) byte-array-type x))) + +(defn type-descriptor + "Returns a descriptor of the type of the given instance." + [x] + (cond + + (nil? x) + (g/type ::nil) + + (identical? bytes x) + (g/type byte-array-type) + + (vector? x) + (vector-of (.type ^Type (type-descriptor (first x)))) + + (sequential? x) + (seq-of nil) + + (s/source? x) + (stream-of nil) + + :else + (g/type (class x)))) + +(defn- normalize-type-descriptor [x] + (cond + (instance? Type x) + x + + (or (= 'bytes x) (= bytes x)) + (g/type byte-array-type) + + :else + (g/type (eval x)))) + +(defn- tag-metadata-for [^Type src] + (if (and (instance? Class (.type src)) + (not (.wrapper src))) + {:tag (if (= src (normalize-type-descriptor 'bytes)) + 'bytes + (.getName ^Class (.type src)))} + {})) + +(defmacro def-conversion + "Defines a conversion from one type to another." + [[src dst :as conversion] params & body] + (let [^Type src (normalize-type-descriptor src) + dst (normalize-type-descriptor dst)] + `(let [f# + (fn [~(with-meta (first params) + (tag-metadata-for src)) + ~(if-let [options (second params)] + options + `_#)] + ~@body) + + cost# + ~(get (meta conversion) :cost 1)] + (swap! conversions g/assoc-conversion ~src ~dst f# cost#) + (swap! inverse-conversions g/assoc-conversion ~dst ~src f# cost#)))) + +(defmacro def-transfer + "Defines a byte transfer from one type to another." + [[src dst] params & body] + (let [src (normalize-type-descriptor src) + dst (normalize-type-descriptor dst) + src-meta (tag-metadata-for src) + dst-meta (tag-metadata-for dst)] + `(swap! src->dst->transfer assoc-in [~src ~dst] + (fn [~(with-meta (first params) src-meta) + ~(with-meta (second params) dst-meta) + ~(if-let [options (get params 2)] options (gensym "options"))] + ~@body)))) + +;;; convert + +(def ^:private converter + (memoize + (fn [src dst] + (g/conversion-fn @conversions src dst)))) + +(declare convert) + +(def ^:private seq-converter + (memoize + (fn [dst] + (g/seq-conversion-fn @conversions convert 'seq dst)))) + +(def ^:private stream-converter + (memoize + (fn [dst] + (g/seq-conversion-fn @conversions convert 'stream dst)))) + +(defn conversion-path [src dst] + (let [path (-> @conversions + (g/conversion-path (g/type src) (g/type dst)) + :path)] + (map (partial mapv g/pprint-type) path))) + +(defn convert + "Converts `x`, if possible, into type `dst`, which can be either a class or protocol. If no such conversion + is possible, an IllegalArgumentException is thrown. If `x` is a stream, then the `src` type must be explicitly + specified. + + `options` is a map, whose available settings depend on what sort of transform is being performed: + + `chunk-size` - if a stream is being transformed into a sequence of discrete chunks, `:chunk-size` describes the + size of the chunks, which default to 4096 bytes. + + `encoding` - if a string is being encoded or decoded, `:encoding` describes the charset that is used, which + defaults to 'UTF-8' + + `direct?` - if a byte-buffer is being allocated, `:direct?` describes whether it should be a direct buffer, + defaulting to false + `source-type` - overrides input type detection, required to convert a stream + + (NB: if you need to convert a stream to a seq, or vice versa, but not the underlying byte type, you want + Manifold's `stream->seq` and `->source` instead)" + ([x dst] + (convert x dst nil)) + ([x dst options] + (let [dst (g/type dst) + source-type (get options :source-type) + ^Type + src (g/type + (or source-type + (type-descriptor x))) + wrapper (.wrapper src)] + + (cond + + (not (nil? (.type src))) + (if-let [f (or + (converter src dst) + (converter (g/type (class x)) dst))] + (f x (if source-type (dissoc options :source-type) options)) + (throw + (IllegalArgumentException. + (str "Don't know how to convert " (class x) " into " (g/pprint-type dst))))) + + (= 'seq wrapper) + (if-let [f (seq-converter dst)] + (f x (if source-type (dissoc options :source-type) options)) + x) + + (= 'stream wrapper) + (if-let [f (stream-converter dst)] + (f x (if source-type (dissoc options :source-type) options)) + x) + + :else + (throw (IllegalArgumentException. (str "invalid wrapper type: " (pr-str wrapper) " " (pr-str (.type src))))))))) + +(defn possible-conversions + "Returns a list of all possible conversion targets from value." + [src] + (let [^Type src (g/type src) + pred (cond + (.type src) + (partial converter src) + + (= 'seq (.wrapper src)) + seq-converter + + (= 'stream (.wrapper src)) + stream-converter)] + (->> @conversions + g/possible-targets + (filter pred) + (map g/pprint-type)))) + +(let [memoized-cost (memoize + (fn [src dst] + (if-let [path (g/conversion-path @conversions src dst)] + (:cost path) + 9999)))] + (defn conversion-cost + "Returns the estimated cost of converting the data `x` to the destination type `dst`." + ^long [x dst] + (memoized-cost (type-descriptor x) (normalize-type-descriptor dst)))) + +;;; transfer + +(defn- default-transfer + [source sink {:keys [chunk-size] :or {chunk-size 1024} :as options}] + (loop [] + (when-let [b (proto/take-bytes! source chunk-size options)] + (proto/send-bytes! sink b options) + (recur)))) + +(def ^:private transfer-fn + (memoize + (fn this [^Type src ^Type dst] + (let [converter-fn (cond + (nil? (.wrapper src)) + converter + + (#{'seq 'vector} (.wrapper src)) + (fn [_ d] (seq-converter d)) + + (= 'stream (.wrapper src)) + (fn [_ d] (stream-converter d)))] + + ;; TODO: do a reverse traversal, not an exhaustive forward search + (let [[src' dst'] (->> @src->dst->transfer + keys + (map (fn [src'] + (and + (converter-fn src src') + (when-let [dst' (some + #(and (converter-fn dst %) %) + (keys (@src->dst->transfer src')))] + [src' dst'])))) + (remove nil?) + first)] + (cond + + (and src' dst') + (let [f (get-in @src->dst->transfer [src' dst'])] + (fn [source sink options] + (let [source' (convert source src' options) + sink' (convert sink dst' options)] + (f source' sink' options)))) + + (and + (converter-fn src (g/type #'proto/ByteSource)) + (converter dst (g/type #'proto/ByteSink))) + (fn [source sink {:keys [close?] :or {close? true} :as options}] + (let [source' (convert source #'proto/ByteSource options) + sink' (convert sink #'proto/ByteSink options)] + (default-transfer source' sink' options) + (when close? + (doseq [x [source sink source' sink']] + (when (proto/closeable? x) + (proto/close x)))))) + + :else + nil)))))) + +;; for byte transfers +(defn transfer + "Transfers, if possible, all bytes from `source` into `sink`. If this cannot be accomplished, an IllegalArgumentException is + thrown. + + `options` is a map whose available settings depends on the source and sink types: + + `chunk-size` - if a stream is being transformed into a sequence of discrete chunks, `:chunk-size` describes the + size of the chunks, which default to 4096 bytes. + + `encoding` - if a string is being encoded or decoded, `:encoding` describes the charset that is used, which + defaults to 'UTF-8' + + `append?` - if a file is being written to, `:append?` determines whether the bytes will overwrite the existing content + or be appended to the end of the file. This defaults to true. + + `close?` - whether the sink and source should be closed once the transfer is done, defaults to true." + ([source sink] + (transfer source sink nil)) + ([source sink options] + (transfer source nil sink options)) + ([source source-type sink options] + (let [src (type-descriptor source) + dst (type-descriptor sink)] + (if-let [f (transfer-fn src dst)] + (f source sink options) + (throw (IllegalArgumentException. (str "Don't know how to transfer between " (g/pprint-type src) " to " (g/pprint-type dst)))))))) + +(def ^{:doc "Web-scale."} dev-null + (reify proto/ByteSink + (send-bytes! [_ _ _]))) + +(defn optimized-transfer? + "Returns true if an optimized transfer function exists for the given source and sink objects." + [type-descriptor sink-type] + (boolean (transfer-fn type-descriptor sink-type))) + +;;; conversion definitions + +(def-conversion ^{:cost 0} [(stream-of bytes) InputStream] + [s options] + (let [ps (ps/pushback-stream (get options :buffer-size 1024))] + (d/loop [] + (d/chain (s/take! s ::none) + (fn [^bytes msg] + (if (identical? ::none msg) + (do + (ps/close ps) + false) + (ps/put-array ps msg 0 (alength msg)))) + (fn [result] + (when result + (d/recur))))) + (ps/->input-stream ps))) + +(def-conversion ^{:cost 0} [(stream-of ByteBuffer) InputStream] + [s options] + (let [ps (ps/pushback-stream (get options :buffer-size 1024))] + (d/loop [] + (d/chain (s/take! s ::none) + (fn [^ByteBuffer msg] + (if (identical? ::none msg) + (do + (ps/close ps) + false) + (ps/put-buffer ps (.duplicate msg)))) + (fn [result] + (when result + (d/recur))))) + (ps/->input-stream ps))) + +;; byte-array => byte-buffer +(def-conversion ^{:cost 0} [bytes ByteBuffer] + [ary {:keys [direct?] :or {direct? false}}] + (if direct? + (let [len (Array/getLength ary) + ^ByteBuffer buf (ByteBuffer/allocateDirect len)] + (.put buf ary 0 len) + (.position buf 0) + buf) + (ByteBuffer/wrap ary))) + +;; byte-array => input-stream +(def-conversion ^{:cost 0} [bytes InputStream] + [ary] + (ByteArrayInputStream. ary)) + +;; byte-buffer => input-stream +(def-conversion ^{:cost 0} [ByteBuffer InputStream] + [buf] + (ByteBufferInputStream. (.duplicate buf))) + +;; byte-buffer => byte-array +(def-conversion [ByteBuffer bytes] + [buf] + (if (.hasArray buf) + (if (== (alength (.array buf)) (.remaining buf)) + (.array buf) + (let [ary (Utils/byteArray (.remaining buf))] + (doto buf + .mark + (.get ary 0 (.remaining buf)) + .reset) + ary)) + (let [^bytes ary (Utils/byteArray (.remaining buf))] + (doto buf .mark (.get ary) .reset) + ary))) + +;; sequence of byte-buffers => byte-buffer +(def-conversion [(vector-of ByteBuffer) ByteBuffer] + [bufs {:keys [direct?] :or {direct? false}}] + (cond + (empty? bufs) + (ByteBuffer/allocate 0) + + (and (empty? (rest bufs)) (not (proto/closeable? bufs))) + (first bufs) + + :else + (let [len (reduce + (map #(.remaining ^ByteBuffer %) bufs)) + buf (if direct? + (ByteBuffer/allocateDirect len) + (ByteBuffer/allocate len))] + (doseq [^ByteBuffer b bufs] + (.mark b) + (.put buf b) + (.reset b)) + (when (proto/closeable? bufs) + (proto/close bufs)) + (.flip buf)))) + +;; byte-buffer => sequence of byte-buffers +(def-conversion ^{:cost 0} [ByteBuffer (vector-of ByteBuffer)] + [buf {:keys [chunk-size]}] + (if chunk-size + (let [lim (.limit buf) + indices (range (.position buf) lim chunk-size)] + (mapv + #(-> buf + .duplicate + ^ByteBuffer (.position (int %)) + ^ByteBuffer (.limit (int (min lim (+ (int %) chunk-size)))) + .slice) + indices)) + [buf])) + +;; channel => input-stream +(def-conversion ^{:cost 0} [ReadableByteChannel InputStream] + [channel] + (Channels/newInputStream channel)) + +;; channel => lazy-seq of byte-buffers +(def-conversion [ReadableByteChannel (seq-of ByteBuffer)] + [channel {:keys [chunk-size direct?] :or {chunk-size 4096, direct? false} :as options}] + (lazy-seq + (when-let [b (proto/take-bytes! channel chunk-size options)] + (cons b (convert channel (seq-of ByteBuffer) options))))) + +;; input-stream => channel +(def-conversion ^{:cost 0} [InputStream ReadableByteChannel] + [input-stream] + (Channels/newChannel input-stream)) + +;; string => byte-array +(def-conversion ^{:cost 2} [String byte-array-type] + [s {:keys [encoding] :or {encoding "UTF-8"}}] + (.getBytes s ^String (name encoding))) + +;; byte-array => string +(def-conversion ^{:cost 2} [bytes String] + [ary {:keys [encoding] :or {encoding "UTF-8"}}] + (String. ^bytes ary (name encoding))) + +;; lazy-seq of byte-buffers => channel +(def-conversion ^{:cost 1.5} [(seq-of ByteBuffer) ReadableByteChannel] + [bufs] + (let [pipe (Pipe/open) + ^WritableByteChannel sink (.sink pipe) + source (doto ^AbstractSelectableChannel (.source pipe) + (.configureBlocking true))] + (future + (try + (loop [s bufs] + (when (and (not (empty? s)) (.isOpen sink)) + (let [buf (.duplicate ^ByteBuffer (first s))] + (.write sink buf) + (recur (rest s))))) + (finally + (.close sink)))) + source)) + +(def-conversion ^{:cost 1.5} [(seq-of #'proto/ByteSource) InputStream] + [srcs options] + (let [chunk-size (get options :chunk-size 65536) + out (PipedOutputStream.) + in (PipedInputStream. out chunk-size)] + (future + (try + (loop [s srcs] + (when-not (empty? s) + (transfer (first s) out) + (recur (rest s)))) + (finally + (.close out)))) + in)) + +(def-conversion ^{:cost 1.5} [InputStream byte-array-type] + [in options] + (let [out (ByteArrayOutputStream. (p/max 64 (.available in))) + buf (Utils/byteArray 16384)] + (loop [] + (let [len (.read in buf 0 16384)] + (when-not (neg? len) + (.write out buf 0 len) + (recur)))) + (.toByteArray out))) + +#_(let [ary (Utils/byteArray 0)] + (def-conversion ^{:cost 0} [::nil byte-array-type] + [src options] + ary)) + +(def-conversion ^{:cost 2} [#'proto/ByteSource byte-array-type] + [src options] + (let [os (ByteArrayOutputStream.)] + (transfer src os) + (.toByteArray os))) + +;; generic byte-source => lazy char-sequence +(def-conversion ^{:cost 2} [#'proto/ByteSource CharSequence] + [source options] + (cs/decode-byte-source + #(when-let [bytes (proto/take-bytes! source % options)] + (convert bytes ByteBuffer options)) + #(when (proto/closeable? source) + (proto/close source)) + options)) + +;; input-stream => reader +(def-conversion ^{:cost 1.5} [InputStream Reader] + [input-stream {:keys [encoding] :or {encoding "UTF-8"}}] + (BufferedReader. (InputStreamReader. input-stream ^String encoding))) + +;; reader => char-sequence +(def-conversion ^{:cost 1.5} [Reader CharSequence] + [reader {:keys [chunk-size] :or {chunk-size 2048}}] + (let [ary (char-array chunk-size) + sb (StringBuilder.)] + (loop [] + (let [n (.read reader ary 0 chunk-size)] + (if (pos? n) + (do + (.append sb ary 0 n) + (recur)) + (.toString sb)))))) + +;; char-sequence => string +(def-conversion [CharSequence String] + [char-sequence] + (.toString char-sequence)) + +(def-conversion [(vector-of String) String] + [strings] + (let [sb (StringBuilder.)] + (doseq [s strings] + (.append sb s)) + (.toString sb))) + +;; file => readable-channel +(def-conversion ^{:cost 0} [File ReadableByteChannel] + [file] + (.getChannel (FileInputStream. file))) + +;; file => writable-channel +(def-conversion ^{:cost 0} [File WritableByteChannel] + [file {:keys [append?] :or {append? true}}] + (.getChannel (FileOutputStream. file (boolean append?)))) + +(def-conversion ^{:cost 0} [File (seq-of ByteBuffer)] + [file {:keys [chunk-size writable?] :or {chunk-size (int 2e9), writable? false}}] + (let [^RandomAccessFile raf (RandomAccessFile. file (if writable? "rw" "r")) + ^FileChannel fc (.getChannel raf) + buf-seq (fn buf-seq [offset] + (when-not (<= (.size fc) offset) + (let [remaining (- (.size fc) offset)] + (lazy-seq + (cons + (.map fc + (if writable? + FileChannel$MapMode/READ_WRITE + FileChannel$MapMode/READ_ONLY) + offset + (min remaining chunk-size)) + (buf-seq (+ offset chunk-size)))))))] + (g/closeable-seq + (buf-seq 0) + false + #(do + (.close raf) + (.close fc))))) + +;; output-stream => writable-channel +(def-conversion ^{:cost 0} [OutputStream WritableByteChannel] + [output-stream] + (Channels/newChannel output-stream)) + +;; writable-channel => output-stream +(def-conversion ^{:cost 0} [WritableByteChannel OutputStream] + [channel] + (Channels/newOutputStream channel)) + +;;; def-transfers + +(def-transfer [ReadableByteChannel File] + [channel file {:keys [chunk-size] :or {chunk-size (int 1e7)} :as options}] + (let [^FileChannel fc (convert file WritableByteChannel options)] + (try + (loop [idx 0] + (let [n (.transferFrom fc channel idx chunk-size)] + (when (pos? n) + (recur (+ idx n))))) + (finally + (.force fc true) + (.close fc))))) + +(def-transfer [File WritableByteChannel] + [file + channel + {:keys [chunk-size + close?] + :or {chunk-size (int 1e6) + close? true} + :as options}] + (let [^FileChannel fc (convert file ReadableByteChannel options)] + (try + (loop [idx 0] + (let [n (.transferTo fc idx chunk-size channel)] + (when (pos? n) + (recur (+ idx n))))) + (finally + (when close? + (.close ^WritableByteChannel channel)) + (.close fc))))) + +(def-transfer [InputStream OutputStream] + [input-stream + output-stream + {:keys [chunk-size + close?] + :or {chunk-size 4096 + close? true} + :as options}] + (let [ary (Utils/byteArray chunk-size)] + (try + (loop [] + (let [n (.read ^InputStream input-stream ary)] + (when (pos? n) + (.write ^OutputStream output-stream ary 0 n) + (recur)))) + (.flush ^OutputStream output-stream) + (finally + (.close ^InputStream input-stream) + (when close? + (.close ^OutputStream output-stream)))))) + +;;; protocol extensions + +(extend-protocol proto/ByteSink + + OutputStream + (send-bytes! [this b _] + (let [^OutputStream os this] + (.write os ^bytes (convert b byte-array-type)))) + + WritableByteChannel + (send-bytes! [this b _] + (let [^WritableByteChannel ch this] + (.write ch ^ByteBuffer (convert b ByteBuffer))))) + +(extend-protocol proto/ByteSource + + InputStream + (take-bytes! [this n _] + (let [ary (clojure.core/byte-array n) + n (long n)] + (loop [idx 0] + (if (== idx n) + ary + (let [read (.read this ary idx (long (- n idx)))] + (if (== -1 read) + (when (pos? idx) + (let [ary' (clojure.core/byte-array idx)] + (System/arraycopy ary 0 ary' 0 idx) + ary')) + (recur (long (+ idx read))))))))) + + ReadableByteChannel + (take-bytes! [this n {:keys [direct?] :or {direct? false}}] + (let [^ByteBuffer buf (if direct? + (ByteBuffer/allocateDirect n) + (ByteBuffer/allocate n))] + + (loop [] + (when (try + (pos? (.read this buf)) + (catch Throwable e + false)) + (recur))) + + (when (pos? (.position buf)) + (.flip buf)))) + + ByteBuffer + (take-bytes! [this n _] + (when (pos? (.remaining this)) + (let [n (int (min (.remaining this) n)) + buf (-> this + .duplicate + ^ByteBuffer (.limit (+ (.position this) n)) + ^ByteBuffer (.slice) + (.order (.order this)))] + (.position this (+ n (.position this))) + buf)))) + + + +;;; print-bytes + +(let [special-character? (->> "' _-+=`~{}[]()\\/#@!?.,;\"" (map int) set)] + (defn- readable-character? [x] + (or + (Character/isLetterOrDigit (int x)) + (special-character? (int x))))) + +(defn print-bytes + "Prints out the bytes in both hex and ASCII representations, 16 bytes per line." + [bytes] + (let [bufs (convert bytes (seq-of ByteBuffer) {:chunk-size 16})] + (doseq [^ByteBuffer buf bufs] + (let [s (convert (.duplicate buf) String {:encoding "ISO-8859-1"}) + bytes (repeatedly (min 16 (.remaining buf)) #(.get buf)) + padding (* 3 (- 16 (count bytes))) + hex-format #(->> "%02X" (repeat %) (interpose " ") (apply str))] + (println + (apply format + (str + (hex-format (min 8 (count bytes))) + " " + (hex-format (max 0 (- (count bytes) 8)))) + bytes) + (apply str (repeat padding " ")) + " " + (->> s + (map #(if (readable-character? %) % ".")) + (apply str))))))) + +;;; to-* helpers + +(defn ^ByteBuffer to-byte-buffer + "Converts the object to a `java.nio.ByteBuffer`." + ([x] + (to-byte-buffer x nil)) + ([x options] + (condp instance? x + ByteBuffer x + byte-array-type (ByteBuffer/wrap x) + String (ByteBuffer/wrap (.getBytes ^String x (name (get options :encoding "UTF-8")))) + (convert x ByteBuffer options)))) + +(defn to-byte-buffers + "Converts the object to a sequence of `java.nio.ByteBuffer`." + ([x] + (to-byte-buffers x nil)) + ([x options] + (convert x (seq-of ByteBuffer) options))) + +(defn ^"[B" to-byte-array + "Converts the object to a byte-array." + ([x] + (to-byte-array x nil)) + ([x options] + (condp instance? x + byte-array-type x + String (.getBytes ^String x (name (get options :encoding "UTF-8"))) + (convert x byte-array-type options)))) + +(defn to-byte-arrays + "Converts the object to a byte-array." + ([x] + (to-byte-array x nil)) + ([x options] + (convert x (seq-of byte-array-type) options))) + +(defn ^InputStream to-input-stream + "Converts the object to a `java.io.InputStream`." + ([x] + (to-input-stream x nil)) + ([x options] + (condp instance? x + byte-array-type (ByteArrayInputStream. x) + ByteBuffer (ByteBufferInputStream. x) + (convert x InputStream options)))) + +(defn ^DataInputStream to-data-input-stream + ([x] + (to-data-input-stream x nil)) + ([x options] + (if (instance? DataInputStream x) + x + (DataInputStream. (to-input-stream x))))) + +(defn ^InputStream to-output-stream + "Converts the object to a `java.io.OutputStream`." + ([x] + (to-output-stream x nil)) + ([x options] + (convert x OutputStream options))) + +(defn ^CharSequence to-char-sequence + "Converts to the object to a `java.lang.CharSequence`." + ([x] + (to-char-sequence x nil)) + ([x options] + (if (instance? CharSequence x) + x + (convert x CharSequence options)))) + +(defn ^ReadableByteChannel to-readable-channel + "Converts the object to a `java.nio.ReadableByteChannel`" + ([x] + (to-readable-channel x nil)) + ([x options] + (convert x ReadableByteChannel options))) + +(defn ^String to-string + "Converts the object to a string." + ([x] + (to-string x nil)) + ([x options] + (let [encoding (get options :encoding "UTF-8")] + (condp instance? x + String x + byte-array-type (String. ^"[B" x ^String (name encoding)) + (convert x String options))))) + +(defn to-reader + "Converts the object to a java.io.Reader." + ([x] + (to-reader x nil)) + ([x options] + (convert x Reader options))) + +(defn to-line-seq + "Converts the object to a lazy sequence of newline-delimited strings." + ([x] + (to-line-seq x nil)) + ([x options] + (let [reader (convert x Reader options) + reader (BufferedReader. ^Reader reader) + line! (fn line! [] + (lazy-seq + (when-let [l (try + (.readLine reader) + (catch IOException e + nil))] + (cons l (line!)))))] + (line!)))) + +(defn to-byte-source + "Converts the object to something that satisfies `ByteSource`." + ([x] + (to-byte-source x nil)) + ([x options] + (convert x #'proto/ByteSource options))) + +(defn to-byte-sink + "Converts the object to something that satisfies `ByteSink`." + ([x] + (to-byte-sink x nil)) + ([x options] + (convert x #'proto/ByteSink options))) + +;;; + +(defn- cmp-bufs + ^long [^ByteBuffer a' ^ByteBuffer b'] + (let [diff (p/- (.remaining a') (.remaining b')) + sign (long (if (pos? diff) -1 1)) + a (if (pos? diff) b' a') + b (if (pos? diff) a' b') + limit (p/>> (.remaining a) 2) + a-offset (.position a) + b-offset (.position b)] + (let [cmp (loop [idx 0] + (if (p/>= idx limit) + 0 + (let [cmp (p/- + (p/int->uint (.getInt a (p/+ idx a-offset))) + (p/int->uint (.getInt b (p/+ idx b-offset))))] + (if (p/== 0 cmp) + (recur (p/+ idx 4)) + ;; Use (if (pos? cmp) 1 -1) to ensure that the + ;; sign of the value x returned by cmp-bufs (and + ;; compare-bytes) is not modified when Clojure's + ;; comparator infrastructure calls (.intValue + ;; x). The intValue method truncates a Java + ;; Long's most significant 32 bits away, which + ;; in some cases changes the sign of the result, + ;; and thus the direction of the comparison + ;; result. Such code is not needed when + ;; comparing individual bytes below, because the + ;; subtraction result fits within the least + ;; significant 9 bits, and (.intValue x) never + ;; changes the sign. + (p/* sign (if (pos? cmp) 1 -1))))))] + (if (p/== 0 (long cmp)) + (let [limit' (.remaining a)] + (loop [idx limit] + (if (p/>= idx limit') + diff + (let [cmp (p/- + (p/byte->ubyte (.get a (p/+ idx a-offset))) + (p/byte->ubyte (.get b (p/+ idx b-offset))))] + (if (p/== 0 cmp) + (recur (p/inc idx)) + (p/* sign cmp)))))) + cmp)))) + +(defn compare-bytes + "Returns a comparison result for two byte streams." + ^long [a b] + (if (and + (or + (instance? byte-array-type a) + (instance? ByteBuffer a) + (instance? String a)) + (or + (instance? byte-array-type b) + (instance? ByteBuffer b) + (instance? String b))) + (cmp-bufs (to-byte-buffer a) (to-byte-buffer b)) + (loop [a (to-byte-buffers a), b (to-byte-buffers b)] + (cond + (empty? a) + (if (empty? b) 0 -1) + + (empty? b) + 1 + + :else + (let [cmp (cmp-bufs (first a) (first b))] + (if (p/== 0 cmp) + (recur (rest a) (rest b)) + cmp)))))) + +(defn bytes= + "Returns true if the two byte streams are equivalent." + [a b] + (p/== 0 (compare-bytes a b))) + + + +(comment + (require '[manifold.stream :as ms]) + + (def content + (doto (ms/stream) + (ms/put! (clojure.core/byte-array 5)) + (ms/close!))) + + (conversion-path (stream-of bytes) (seq-of bytes)) ; => ([(stream-of [B) (seq-of [B)]) + + (convert content (seq-of bytes)) ; doesn't work + (= content (convert content (seq-of bytes))) + + (convert content (seq-of bytes) {:source-type (stream-of bytes)}) ; works + (= content (convert content (seq-of bytes) {:source-type (stream-of bytes)})) + + (convert content (seq-of bytes) {:source-type (stream-of nil)}) ; doesn't work + (= content (convert content (seq-of bytes) {:source-type (stream-of nil)})) + + (convert content (seq-of String)) ; also doesn't work + (= content (convert content (seq-of String))) + + (convert content (seq-of java.nio.ByteBuffer)) ; this works + (= content (convert content (seq-of java.nio.ByteBuffer))) + + + + (let [content (doto (ms/stream) + (ms/put! (clojure.core/byte-array 5)) + (ms/close!))] + (convert content (seq-of bytes)) ; no + #_ (convert content (seq-of bytes) {:source-type (stream-of bytes)}) ; yes + #_(convert content (seq-of bytes) {:source-type (stream-of nil)}) ; no + ) + + + (let [content (doto (ms/stream) + (ms/put! (clojure.core/byte-array 5)) + (ms/close!))] + (convert content (seq-of String)) ; also doesn't work + (= content (convert content (seq-of String)))) + + (let [content (doto (ms/stream) + (ms/put! (clojure.core/byte-array 5)) + (ms/close!))] + (convert content (seq-of java.nio.ByteBuffer)) ; this works + (= content (convert content (seq-of java.nio.ByteBuffer)))) + + + ) diff --git a/src/clj_commons/byte_streams/ByteBufferInputStream.java b/src/clj_commons/byte_streams/ByteBufferInputStream.java new file mode 100644 index 0000000..c64c5eb --- /dev/null +++ b/src/clj_commons/byte_streams/ByteBufferInputStream.java @@ -0,0 +1,57 @@ +package clj_commons.byte_streams; + +import java.io.InputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class ByteBufferInputStream extends InputStream { + + private ByteBuffer _buf; + + public ByteBufferInputStream(ByteBuffer buf) { + _buf = buf; + } + + public void close() { + } + + public int available() { + return _buf.remaining(); + } + + public boolean markSupported() { + return true; + } + + public void mark(int readlimit) { + _buf.mark(); + } + + public void reset() { + _buf.reset(); + } + + public long skip(long n) { + int nP = Math.min((int)n, _buf.remaining()); + _buf.position(_buf.position() + nP); + return (long)nP; + } + + public int read() throws IOException { + if (!_buf.hasRemaining()) { + return -1; + } else { + return (int) _buf.get() & 0xFF; + } + } + + public int read(byte[] bytes, int offset, int length) throws IOException { + length = Math.min(length, _buf.remaining()); + if (length == 0) { + return -1; + } else { + _buf.get(bytes, offset, length); + return length; + } + } +} diff --git a/src/clj_commons/byte_streams/InputStream.java b/src/clj_commons/byte_streams/InputStream.java new file mode 100644 index 0000000..941f275 --- /dev/null +++ b/src/clj_commons/byte_streams/InputStream.java @@ -0,0 +1,52 @@ +package clj_commons.byte_streams; + +import java.io.IOException; + +public class InputStream extends java.io.InputStream { + + public interface Streamable { + int available(); + void close(); + long skip(long n); + int read() throws IOException; + int read(byte[] bytes, int offset, int length) throws IOException; + } + + private Streamable _s; + + public InputStream(Streamable s) { + _s = s; + } + + public void close() { + _s.close(); + } + + public int available() { + return _s.available(); + } + + public boolean markSupported() { + return false; + } + + public void mark(int readlimit) { + throw new UnsupportedOperationException(); + } + + public void reset() { + throw new UnsupportedOperationException(); + } + + public long skip(long n) { + return _s.skip(n); + } + + public int read() throws IOException { + return _s.read(); + } + + public int read(byte[] bytes, int offset, int length) throws IOException { + return _s.read(bytes, offset, length); + } +} diff --git a/src/clj_commons/byte_streams/Utils.java b/src/clj_commons/byte_streams/Utils.java new file mode 100644 index 0000000..2657d26 --- /dev/null +++ b/src/clj_commons/byte_streams/Utils.java @@ -0,0 +1,7 @@ +package clj_commons.byte_streams; + +public class Utils { + public static byte[] byteArray(int length) { + return new byte[length]; + } +} diff --git a/src/clj_commons/byte_streams/char_sequence.clj b/src/clj_commons/byte_streams/char_sequence.clj new file mode 100644 index 0000000..9a85039 --- /dev/null +++ b/src/clj_commons/byte_streams/char_sequence.clj @@ -0,0 +1,122 @@ +(ns clj-commons.byte-streams.char-sequence + (:refer-clojure :exclude [flush]) + (:import + [java.util.concurrent.locks + ReentrantLock] + [java.io + ByteArrayOutputStream] + [java.nio + ByteBuffer + CharBuffer] + [java.nio.charset + Charset + CharsetDecoder + CoderResult + CodingErrorAction])) + +(set! *unchecked-math* true) + +(defn coding-error-action [action] + (case + :report CodingErrorAction/REPORT + :ignore CodingErrorAction/IGNORE + :replace CodingErrorAction/REPLACE)) + +(defn parse-result [^CoderResult result] + (cond + (.isUnderflow result) :underflow + (.isOverflow result) :overflow + :else (throw (IllegalArgumentException. "Malformed byte-stream input to CharsetDecoder")))) + +(defn decode + [^CharsetDecoder decoder ^ByteBuffer in ^CharBuffer out] + (parse-result (.decode decoder in out false))) + +(defn flush + [^CharsetDecoder decoder ^ByteBuffer in ^CharBuffer out] + (parse-result (.decode decoder (or in (ByteBuffer/allocate 0)) out true)) + (parse-result (.flush decoder out))) + +(defn concat-bytes [^ByteBuffer a ^ByteBuffer b] + (let [buf (ByteBuffer/allocate (+ (.remaining a) (.remaining b)))] + (.put buf a) + (.put buf b) + (.flip buf))) + +(defn lazy-char-buffer-sequence + [^CharsetDecoder decoder + chunk-size + ^ByteBuffer extra-bytes + close-fn + byte-source] + (lazy-seq + (let [num-bytes (+ (long + (if extra-bytes + (.remaining extra-bytes) + 0)) + (long chunk-size)) + len (long + (Math/ceil + (/ num-bytes + (.averageCharsPerByte decoder)))) + out (CharBuffer/allocate len)] + + (if (and extra-bytes (= :overflow (decode decoder extra-bytes out))) + + ;; we didn't even exhaust the overflow bytes, try again + (cons + out + (lazy-char-buffer-sequence decoder chunk-size extra-bytes close-fn byte-source)) + + (if-let [in (byte-source chunk-size)] + (let [in (if (and extra-bytes (.hasRemaining extra-bytes)) + (concat-bytes extra-bytes in) + in) + result (decode decoder in out)] + (cons + (.flip out) + (lazy-char-buffer-sequence + decoder + chunk-size + (when (.hasRemaining ^ByteBuffer in) in) + close-fn + byte-source))) + (do + (flush decoder extra-bytes out) + (when close-fn (close-fn)) + (.flip out))))))) + +(defn decode-byte-source + [byte-source + close-fn + {:keys [chunk-size encoding on-encoding-error] + :or {chunk-size 1024 + on-encoding-error :replace + encoding "UTF-8"}}] + (let [action (coding-error-action on-encoding-error) + decoder (doto (.newDecoder (Charset/forName encoding)) + (.onMalformedInput action) + (.onUnmappableCharacter action)) + s (lazy-char-buffer-sequence decoder chunk-size nil close-fn byte-source)] + (reify + java.io.Closeable + (close [_] (when close-fn (close-fn))) + + CharSequence + (charAt [_ idx] + (loop [remaining idx, s s] + (if (empty? s) + (throw (IndexOutOfBoundsException. (str idx))) + (let [^CharBuffer buf (first s)] + (if (< (.remaining buf) remaining) + (.charAt buf remaining) + (recur (- remaining (.remaining buf)) (rest s))))))) + (length [_] + (reduce + (map #(.remaining ^CharBuffer %) s))) + #_(subSequence [_ start end] + ) + (toString [_] + (let [buf (StringBuffer.)] + (doseq [b s] + (.append buf b)) + (.toString buf)))))) diff --git a/src/clj_commons/byte_streams/graph.clj b/src/clj_commons/byte_streams/graph.clj new file mode 100644 index 0000000..1af9a72 --- /dev/null +++ b/src/clj_commons/byte_streams/graph.clj @@ -0,0 +1,312 @@ +(ns clj-commons.byte-streams.graph + (:refer-clojure :exclude [type]) + (:require + [manifold.stream :as s] + [clj-commons.byte-streams + [utils :refer [defprotocol+ defrecord+ deftype+]] + [protocols :as p]]) + (:import + [java.util + LinkedList + PriorityQueue])) + +(declare pprint-type) + +(deftype+ Conversion [f ^double cost] + Object + (equals [_ x] + (and + (instance? Conversion x) + (identical? f (.f ^Conversion x)) + (== cost (.cost ^Conversion x)))) + (hashCode [_] + (bit-xor (System/identityHashCode f) (unchecked-int cost)))) + +(deftype+ Type [wrapper type] + Object + (equals [_ x] + (and + (instance? Type x) + (= wrapper (.wrapper ^Type x)) + (= type (.type ^Type x)))) + (hashCode [_] + (bit-xor + (hash wrapper) + (hash type))) + (toString [this] + (pr-str (pprint-type this)))) + +(defn pprint-type [^Type x] + (if-let [wrapper (.wrapper x)] + (list (symbol (str wrapper "-of")) (.type x)) + (.type x))) + +(defn type + ([t] + (if (instance? Type t) + t + (type nil t))) + ([wrapper t] + (Type. wrapper + (if (var? t) + @t + t)))) + +(defn- protocol? [x] + (and (map? x) (contains? x :on-interface))) + +(defn canonicalize [x] + (if (protocol? x) + @(:var x) + x)) + +(defn- class-satisfies? [protocol ^Class c] + (boolean + (or + (.isAssignableFrom ^Class (:on-interface protocol) c) + (some + #(.isAssignableFrom ^Class % c) + (keys (:impls protocol)))))) + +(defn assignable? [^Type a ^Type b] + (and + (= (.wrapper a) (.wrapper b)) + (let [a (canonicalize (.type a)) + b (canonicalize (.type b))] + (cond + (and (class? a) (class? b)) + (.isAssignableFrom ^Class b a) + + (and (protocol? b) (class? a)) + (class-satisfies? b a) + + :else + (= a b))))) + +(defprotocol+ IConversionGraph + (assoc-conversion [_ src dst f cost]) + (equivalent-targets [_ dst]) + (possible-sources [_]) + (possible-targets [_]) + (possible-conversions [_ src]) + (conversion [_ src dst])) + +(defn implicit-conversions [^Type src] + (cond + + ;; vector -> seq + (= 'vector (.wrapper src)) + [[[src (Type. 'seq (.type src))] (Conversion. (fn [x _] (seq x)) 1)]] + + ;; seq -> stream + (= 'seq (.wrapper src)) + [[[src (Type. 'stream (.type src))] (Conversion. (fn [x _] (s/->source x)) 1)]] + + ;; stream -> seq + (= 'stream (.wrapper src)) + [[[src (Type. 'seq (.type src))] (Conversion. (fn [x _] (s/stream->seq x)) 1)]] + + :else + nil)) + +(deftype+ ConversionGraph [m] + IConversionGraph + (assoc-conversion [_ src dst f cost] + (let [m' (assoc-in m [src dst] (Conversion. f cost)) + m' (if (and + (nil? (.wrapper ^Type src)) + (nil? (.wrapper ^Type dst))) + (let [src (.type ^Type src) + dst (.type ^Type dst)] + (-> m' + (assoc-in [(Type. 'seq src) (Type. 'seq dst)] + (Conversion. (fn [x options] (map #(f % options) x)) cost)) + (assoc-in [(Type. 'stream src) (Type. 'stream dst)] + (Conversion. (fn [x options] (s/map #(f % options) x)) (+ cost 0.1))))) + m')] + (ConversionGraph. m'))) + (possible-sources [_] + (keys m)) + (possible-targets [_] + (->> m vals (mapcat keys))) + (equivalent-targets [_ dst] + (->> m + vals + (mapcat keys) + (filter #(assignable? % dst)))) + (possible-conversions [_ src] + (->> m + keys + (filter (partial assignable? src)) + (mapcat (fn [src] + (map + (fn [[k v]] + [[src k] v]) + (get m src)))) + (concat (implicit-conversions src)) + (into {})))) + +(defn conversion-graph [] + (ConversionGraph. {})) + +;;; + +(defrecord+ ConversionPath [path fns visited? cost] + Comparable + (compareTo [_ x] + (let [cmp (compare cost (.cost ^ConversionPath x))] + (if (zero? cmp) + (compare (count path) (count (.path ^ConversionPath x))) + cmp)))) + +(defn- conj-path [^ConversionPath p src dst ^Conversion c] + (ConversionPath. + (conj (.path p) [src dst]) + (conj (.fns p) (.f c)) + (conj (.visited? p) dst) + (+ (.cost p) (.cost c)))) + +(def conversion-path + (memoize + (fn [g src dst] + (let [path (ConversionPath. [] [] #{src} 0)] + (if (assignable? src dst) + path + (let [q (doto (PriorityQueue.) (.add path)) + dsts (equivalent-targets g dst)] + (loop [] + (when-let [^ConversionPath p (.poll q)] + (let [curr (or (-> p .path last second) src)] + (if (some #(assignable? curr %) dsts) + p + (do + (doseq [[[src dst] c] (->> curr + (possible-conversions g) + (remove (fn [[[src dst] c]] ((.visited? p) dst))))] + (.add q (conj-path p src dst c))) + (recur)))))))))))) + +;;; + +(defn closeable-seq [s exhaustible? close-fn] + (if (empty? s) + (when exhaustible? + (close-fn) + nil) + (reify + + clojure.lang.IPending + (isRealized [_] + (or + (not (instance? clojure.lang.IPending s)) + (realized? s))) + + Object + (finalize [_] + (close-fn)) + + java.io.Closeable + (close [_] + (close-fn)) + + clojure.lang.Sequential + clojure.lang.ISeq + clojure.lang.Seqable + (seq [this] this) + (cons [_ a] + (closeable-seq (cons a s) exhaustible? close-fn)) + (next [this] + (closeable-seq (next s) exhaustible? close-fn)) + (more [this] + (let [rst (next this)] + (if (empty? rst) + '() + rst))) + (first [_] + (first s)) + (equiv [a b] + (= s b))))) + +(defn conversion-fn [g src dst] + (when-let [path (conversion-path g src dst)] + (condp = (count (:path path)) + 0 (fn [x _] x) + + 1 (let [f (->> path :fns first)] + (if (p/closeable? src) + (fn [x options] + (let [x' (f x options)] + (when-not (p/closeable? x') + (p/close x)) + x')) + f)) + + ;; multiple stages + (let [fns (->> path :fns (apply vector))] + (fn [x options] + (let [close-fns (LinkedList.) + result (reduce + (fn [x f] + + ;; keep track of everything that needs to be closed once the bytes are exhausted + (when (p/closeable? x) + (.add close-fns #(p/close x))) + (f x options)) + x + fns)] + (if-let [close-fn (when-not (or (p/closeable? result) + (.isEmpty close-fns)) + #(loop [] + (when-let [f (.poll close-fns)] + (f) + (recur))))] + (cond + + (seq? result) + (closeable-seq result true close-fn) + + (s/source? result) + (do + (s/on-drained result close-fn) + result) + + :else + (do + ;; we assume that if the end-result is closeable, it will take care of all the intermediate + ;; objects beneath it. I think this is true as long as we're not doing multiple streaming + ;; reads, but this might need to be revisited. + (when-not (p/closeable? result) + (close-fn)) + result)) + result))))))) + +(defn seq-conversion-fn [g convert wrapper dst] + (let [path (->> g + possible-sources + (remove #(nil? (.wrapper ^Type %))) + (remove #(#{String CharSequence} (.type ^Type %))) + (map #(conversion-path g % dst)) + (remove nil?) + (sort-by :cost) + first) + ^Type src (-> path :path first first)] + + (when src + (let [wrapper' (.wrapper src) + type' (.type src)] + (fn [x options] + (->> x + + ((condp = [wrapper wrapper'] + '[seq vector] vec + '[stream vector] (comp vec s/stream->seq) + '[seq stream] s/->source + '[stream seq] s/stream->seq + identity)) + + ((condp = wrapper' + 'vector (partial mapv #(convert % type' options)) + 'seq (partial map #(convert % type' options)) + 'stream (partial s/map #(convert % type' options)))) + + (#((conversion-fn g src (-> path :path last last)) % options)))))))) diff --git a/src/clj_commons/byte_streams/protocols.clj b/src/clj_commons/byte_streams/protocols.clj new file mode 100644 index 0000000..7f9f742 --- /dev/null +++ b/src/clj_commons/byte_streams/protocols.clj @@ -0,0 +1,34 @@ +(ns clj-commons.byte-streams.protocols + (:require + [clj-commons.byte-streams.utils :refer [defprotocol+]]) + (:import + [java.util.concurrent + ConcurrentHashMap])) + +(defprotocol+ Closeable + (close [_] "A protocol that is a superset of `java.io.Closeable`.")) + +(defprotocol+ ByteSource + (take-bytes! [_ n options] "Takes `n` bytes from the byte source.")) + +(defprotocol+ ByteSink + (send-bytes! [_ bytes options] "Puts `bytes` in the byte sink.")) + +(extend-protocol Closeable + + java.io.Closeable + (close [this] (.close this)) + + ) + +(let [m (ConcurrentHashMap.)] + (defn closeable? [x] + (if (nil? x) + false + (let [c (class x) + v (.get m c)] + (if (nil? v) + (let [v (satisfies? Closeable x)] + (.put m c v) + v) + v))))) diff --git a/src/clj_commons/byte_streams/pushback_stream.clj b/src/clj_commons/byte_streams/pushback_stream.clj new file mode 100644 index 0000000..71fb119 --- /dev/null +++ b/src/clj_commons/byte_streams/pushback_stream.clj @@ -0,0 +1,306 @@ +(ns clj-commons.byte-streams.pushback-stream + (:refer-clojure :exclude [take]) + (:require + [primitive-math :as p] + [clj-commons.byte-streams.utils :refer [doit definterface+ deftype+]] + [manifold + [utils :as u] + [stream :as s] + [deferred :as d]] + [clojure.walk :as walk]) + (:import + [java.nio + ByteBuffer] + [clj_commons.byte_streams + InputStream + InputStream$Streamable] + [java.util + LinkedList + ArrayDeque])) + +(set! *unchecked-math* true) + +(definterface+ PushbackStream + (put [^bytes x ^int offset ^int length]) + (put [^java.nio.ByteBuffer buf]) + (pushback [^bytes ary ^int offset ^int length]) + (pushback [^java.nio.ByteBuffer buf]) + (take [^bytes ary ^int offset ^int length ^boolean eager?]) + (^void close [])) + +(deftype Consumption + [^ByteBuffer buf + deferred + ^boolean eager?]) + +(defn trigger [^Consumption c] + (let [^ByteBuffer buf (.buf c)] + (d/success! (.deferred c) (.position buf)))) + +(defn put [^ByteBuffer src ^ByteBuffer dst] + (let [l (.limit src)] + (.limit src (p/+ (.position src) (p/min (.remaining src) (.remaining dst)))) + (.put dst src) + (.limit src l))) + +(defn- expand-either [first? form] + (let [form' (->> form + (map + #(if (and (seq? %) (= 'either (first %))) + (nth % (if first? 1 2)) + [%])) + (apply concat))] + (with-meta + (if (seq? form) + form' + (into (empty form) form')) + (meta form)))) + +(defn walk + [inner outer form] + (let [form' (cond + (list? form) (outer (apply list (map inner form))) + (seq? form) (outer (doall (map inner form))) + (coll? form) (outer (into (empty form) (map inner form))) + :else (outer form))] + (if (instance? clojure.lang.IMeta form') + (with-meta form' (meta form)) + form'))) + +(defn prewalk + [f form] + (walk (partial prewalk f) identity (f form))) + +(defmacro ^:private both [body] + `(do + ~(prewalk + (fn [x] + (if (sequential? x) + (expand-either true x) + x)) + body) + ~(prewalk + (fn [x] + (if (sequential? x) + (expand-either false x) + x)) + body))) + +(both + (deftype+ (either [PushbackByteStream] [SynchronizedPushbackByteStream]) + [lock + ^LinkedList consumers + ^long buffer-capacity + ^:unsynchronized-mutable ^int buffer-size + ^:unsynchronized-mutable deferred + ^:unsynchronized-mutable closed? + ^LinkedList buffer] + + InputStream$Streamable + + (available [_] + buffer-size) + + (read [this] + (let [ary (byte-array 1) + len (long @(.take this ary 0 1 true))] + (if (zero? len) + -1 + (p/bit-and 0xFF (get ary 0))))) + + (read [this ary offset length] + (let [n (long @(.take this ary offset length true))] + (if (zero? n) + -1 + n))) + + (skip [this n] + @(.take this (byte-array n) 0 n true)) + + PushbackStream + + (put [_ buf] + + (let [[consumers d] + ((either + [do] + [u/with-lock* lock]) + + (if closed? + [nil + (d/success-deferred false)] + + [(loop [acc []] + (if-let [^Consumption c (.peek consumers)] + (let [^ByteBuffer out (.buf c)] + (put buf out) + (when (or (.eager? c) (not (.hasRemaining out))) + (.remove consumers) + (recur (conj acc c)))) + acc)) + + (do + (when (.hasRemaining buf) + (.add buffer buf) + (set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf))))) + + (cond + + deferred + deferred + + (p/<= buffer-size buffer-capacity) + (d/success-deferred true) + + :else + (set! deferred (d/deferred))))]))] + + (when consumers + (doit [c consumers] + (trigger c))) + + d)) + + (put [this ary offset length] + (.put this + (-> (ByteBuffer/wrap ary) + (.position offset) + (.limit (+ offset length))))) + + (pushback [_ buf] + (let [consumers + ((either + [do] + [u/with-lock* lock]) + (let [consumers + (loop [acc []] + (if-let [^Consumption c (.peek consumers)] + (let [^ByteBuffer out (.buf c)] + (put buf out) + (when (or (.eager? c) (not (.hasRemaining out))) + (.remove consumers) + (recur (conj acc c)))) + acc))] + + (when (.hasRemaining buf) + (.addLast buffer buf) + (set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf))))) + + consumers))] + + (doit [c consumers] + (trigger c)))) + + (pushback [this ary offset length] + (.pushback this + (-> (ByteBuffer/wrap ary) + (.position offset) + (.limit (+ offset length))))) + + (take [_ ary offset length eager?] + + (let [out (-> (ByteBuffer/wrap ary) + (.position offset) + ^ByteBuffer (.limit (+ offset length)) + .slice) + + [put take] + + ((either + [do] + [u/with-lock* lock]) + + (loop [] + (when-let [^ByteBuffer in (.peek buffer)] + (put in out) + (when-not (.hasRemaining in) + (.remove buffer)) + (when (.hasRemaining out) + (recur)))) + + (set! buffer-size (unchecked-int (p/- buffer-size (.position out)))) + + [(when (and (p/<= buffer-size buffer-capacity) deferred) + (let [d deferred] + (set! deferred nil) + d)) + + (if (or closed? + (and (pos? (.position out)) + (or eager? (not (.hasRemaining out))))) + (d/success-deferred (.position out)) + (let [d (d/deferred)] + (.add consumers (Consumption. out d eager?)) + d))])] + + (when put + (d/success! put true)) + + take)) + + (close [_] + (when ((either + [do] + [u/with-lock* lock]) + (when-not closed? + (set! closed? true) + true)) + (loop [] + (when-let [^Consumption c (.poll consumers)] + (let [^ByteBuffer buf (.buf c)] + (d/success! (.deferred c) (.position buf))) + (recur)))) + + true))) + +(defn pushback-stream [capacity] + (SynchronizedPushbackByteStream. + (u/mutex) + (LinkedList.) + capacity + 0 + nil + false + (LinkedList.))) + +(defn unsafe-pushback-stream [capacity] + (PushbackByteStream. + (u/mutex) + (LinkedList.) + capacity + 0 + nil + false + (LinkedList.))) + +(def classname "clj_commons.byte_streams.pushback_stream.PushbackStream") + +(definline put-array + [p ary offset length] + `(.put ~(with-meta p {:tag classname}) ~ary ~offset ~length)) + +(definline put-buffer + [p buf] + `(.put ~(with-meta p {:tag classname}) ~buf)) + +(definline close [p] + `(.close ~(with-meta p {:tag classname}))) + +(definline eager-take + [p ary offset length] + `(.take ~(with-meta p {:tag classname}) ~ary ~offset ~length true)) + +(definline take + [p ary offset length] + `(.take ~(with-meta p {:tag classname}) ~ary ~offset ~length false)) + +(definline pushback-array + [p ary offset length] + `(.pushback ~(with-meta p {:tag classname}) ~ary ~offset ~length)) + +(definline pushback-buffer + [p buf] + `(.pushback ~(with-meta p {:tag classname}) ~buf)) + +(defn ->input-stream [pushback-stream] + (InputStream. pushback-stream)) diff --git a/src/clj_commons/byte_streams/utils.clj b/src/clj_commons/byte_streams/utils.clj new file mode 100644 index 0000000..c942061 --- /dev/null +++ b/src/clj_commons/byte_streams/utils.clj @@ -0,0 +1,30 @@ +(ns clj-commons.byte-streams.utils) + +(defmacro defprotocol+ [name & body] + (when-not (resolve name) + `(defprotocol ~name ~@body))) + +(defmacro deftype+ [name & body] + (when-not (resolve name) + `(deftype ~name ~@body))) + +(defmacro defrecord+ [name & body] + (when-not (resolve name) + `(defrecord ~name ~@body))) + +(defmacro definterface+ [name & body] + (when-not (resolve name) + `(definterface ~name ~@body))) + +(defmacro doit + "A version of doseq that doesn't emit all that inline-destroying chunked-seq code." + [[x it] & body] + (let [it-sym (gensym "iterable")] + `(let [~it-sym ~it + it# (.iterator ~(with-meta it-sym {:tag 'java.lang.Iterable}))] + (loop [] + (when (.hasNext it#) + (let [~x (.next it#)] + ~@body) + (recur)))))) + diff --git a/test/byte_streams_reload_test.clj b/test/clj_commons/byte_streams_reload_test.clj similarity index 76% rename from test/byte_streams_reload_test.clj rename to test/clj_commons/byte_streams_reload_test.clj index a8b1ee5..364b266 100644 --- a/test/byte_streams_reload_test.clj +++ b/test/clj_commons/byte_streams_reload_test.clj @@ -1,4 +1,4 @@ -(ns byte-streams-reload-test +(ns clj-commons.byte-streams-reload-test (:require [clojure.test :refer :all])) diff --git a/test/byte_streams_simple_check.clj b/test/clj_commons/byte_streams_simple_check.clj similarity index 84% rename from test/byte_streams_simple_check.clj rename to test/clj_commons/byte_streams_simple_check.clj index e9efb4b..b8588e3 100644 --- a/test/byte_streams_simple_check.clj +++ b/test/clj_commons/byte_streams_simple_check.clj @@ -1,7 +1,7 @@ -(ns byte-streams-simple-check +(ns clj-commons.byte-streams-simple-check (:require [clojure.test :refer :all] - [byte-streams :as bs] + [clj-commons.byte-streams :as bs] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] [clojure.test.check.clojure-test :as ct :refer (defspec)])) diff --git a/test/byte_streams_test.clj b/test/clj_commons/byte_streams_test.clj similarity index 95% rename from test/byte_streams_test.clj rename to test/clj_commons/byte_streams_test.clj index 6884ad9..f264a4d 100644 --- a/test/byte_streams_test.clj +++ b/test/clj_commons/byte_streams_test.clj @@ -1,8 +1,8 @@ -(ns byte-streams-test +(ns clj-commons.byte-streams-test (:require - [byte-streams :refer [bytes= compare-bytes conversion-path convert dev-null possible-conversions seq-of stream-of to-byte-array to-byte-buffer to-byte-buffers to-input-stream to-string transfer vector-of]] + [clj-commons.byte-streams :refer [bytes= compare-bytes conversion-path convert dev-null possible-conversions seq-of stream-of to-byte-array to-byte-buffer to-byte-buffers to-input-stream to-string transfer vector-of]] [clojure.test :refer :all] - [byte-streams.char-sequence :as cs]) + [clj-commons.byte-streams.char-sequence :as cs]) (:refer-clojure :exclude [vector-of]) (:import diff --git a/test/pushback_stream_test.clj b/test/pushback_stream_test.clj index 3caa0df..6f7e76f 100644 --- a/test/pushback_stream_test.clj +++ b/test/pushback_stream_test.clj @@ -1,7 +1,7 @@ (ns pushback-stream-test (:require [clojure.test :refer :all] - [byte-streams.pushback-stream :as p])) + [clj-commons.byte-streams.pushback-stream :as p])) (def in (byte-array (range 100))) diff --git a/version.edn b/version.edn index e3c63c6..05338f5 100644 --- a/version.edn +++ b/version.edn @@ -1 +1 @@ -"0.2" +"0.2.11" From cbd146ce4cd1cdfdbca1d3b7aaedc985e0d6534e Mon Sep 17 00:00:00 2001 From: piotr-yuxuan Date: Thu, 23 Dec 2021 09:13:40 +0000 Subject: [PATCH 2/2] Use multi-segment namespace for primitive-math --- project.clj | 2 +- src/clj_commons/byte_streams.clj | 2 +- src/clj_commons/byte_streams/pushback_stream.clj | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/project.clj b/project.clj index ae0ed3c..05a5869 100644 --- a/project.clj +++ b/project.clj @@ -7,7 +7,7 @@ :username :env/clojars_username :password :env/clojars_password :sign-releases false}]] - :dependencies [[primitive-math "0.1.6"] + :dependencies [[com.github.piotr-yuxuan/primitive-math "0.1.7"] [manifold "0.1.9"]] :profiles {:dev {:dependencies [[org.clojure/clojure "1.10.3"] [org.clojure/test.check "1.1.0"] diff --git a/src/clj_commons/byte_streams.clj b/src/clj_commons/byte_streams.clj index 7ee52b6..01401fe 100644 --- a/src/clj_commons/byte_streams.clj +++ b/src/clj_commons/byte_streams.clj @@ -10,7 +10,7 @@ [pushback-stream :as ps] [char-sequence :as cs]] [clojure.java.io :as io] - [primitive-math :as p]) + [piotr-yuxuan.primitive-math :as p]) (:import [clj_commons.byte_streams Utils diff --git a/src/clj_commons/byte_streams/pushback_stream.clj b/src/clj_commons/byte_streams/pushback_stream.clj index 71fb119..be2df7f 100644 --- a/src/clj_commons/byte_streams/pushback_stream.clj +++ b/src/clj_commons/byte_streams/pushback_stream.clj @@ -1,7 +1,7 @@ (ns clj-commons.byte-streams.pushback-stream (:refer-clojure :exclude [take]) (:require - [primitive-math :as p] + [piotr-yuxuan.primitive-math :as p] [clj-commons.byte-streams.utils :refer [doit definterface+ deftype+]] [manifold [utils :as u]