Pipe dreams aren’t necessarily made of promises

unsorted — cgrand, 2 April 2010 @ 14 h 31 min

Because of the spinning nature of atoms, it’s kind of a hack (a fun hack but still a hack) to build queues on it. Here is the same pipe function built on Java queues:

(defn pipe []
  (let [q (java.util.concurrent.LinkedBlockingQueue.)
        EOQ (Object.)
        NIL (Object.)
        s (fn s [] (lazy-seq (let [x (.take q)] 
                               (when-not (= EOQ x) 
                                 (cons (when-not (= NIL x) x) (s))))))]
    [(s) (fn ([] (.put q EOQ)) ([x] (.put q (or x NIL))))]))

10 Comments »

  1. [...] follow-up: pipe dreams aren’t necessarily made of promises [...]

  2. Thanks again Christophe.

    Comment by Edmund — 2 April 2010 @ 16 h 10 min
  3. why did you decide to return a vector instead of a list?

    Comment by William Hidden — 2 April 2010 @ 16 h 38 min
  4. Nice. It took me a moment to understand the purpose of the NIL object, until I found that LinkedBlockingQueue.put won’t accept a null pointer.

    Comment by Stuart — 2 April 2010 @ 16 h 48 min
  5. J’y comprends toujours rien… mais la “spinning nature of atoms” ? Clojure sert à programmer des ordinateurs quantiques ?

    Comment by Olivier — 2 April 2010 @ 17 h 24 min
  6. @Olivier: un atome Clojure est une référence mutable synchrone non coordonnée (ne participe pas à une transaction) qui est basée sur une opération CAS. Le CAS pouvant échouer, la fonction de mise à jour peut être appliquée plusieurs fois : c’est ça la spinning nature of atoms ! Et oui en écrivant ceci je trouvais ça quantiquement drôle.

    Comment by cgrand — 3 April 2010 @ 20 h 08 min
  7. @William: In Clojure vectors are preferred over lists in such cases.

    Comment by Meikel — 8 April 2010 @ 10 h 03 min
  8. [...] input. I needed to convert input to a more sanitary thing, and so lifted a page directly from the CGrand Master himself. In that post he shows how to create a LinkedBlockingQueue (henceforth LBQ to save pixels), [...]

  9. Note that the implementation above will OutOfMemoryError if large sets of values and/or large values are written into the pipe. Here’s a version that caps the queue that seems to do a better job:

    (defn pipe2
    “Returns a pair: a seq (the read end) and a function (the write end).
    The function can take either no arguments to close the pipe
    or one argument which is appended to the seq. Reads and writes may block.
    The (optional) argument to pipe2 is the maximum size of the queue (beyond
    that size writes will block) – default value if not provided is 100.”
    ([] (pipe2 100))
    ([size]
    (let [q (java.util.concurrent.LinkedBlockingQueue. size)
    EOQ (Object.)
    NIL (Object.)
    s (fn s [] (lazy-seq (let [x (.take q)]
    (when-not (= EOQ x)
    (cons (when-not (= NIL x) x) (s))))))]
    [(s) (fn ([] (.put q EOQ)) ([x] (.put q (or x NIL))))])))

    Test code:

    (let [[q f] (pipe2)]
    (future (doseq [x q] (println x)) (println “that’s all folks!”))
    (doseq [x (range 1000000000)] (f x))
    (f))

    Note: I’m a Clojure n00b, so this may introduce other issues (e.g. deadlock if the queue is full and the readers are also writers).

    Comment by Peter Monks — 8 January 2013 @ 7 h 58 min
  10. [...] read Paul Ingles post “From Callbacks to Sequences” which led me to Christophe Grand’s handy pipe function. The pipe function gives you two things wrapped in a vector: a sequence to pass on for downstream [...]

    Pingback by Pithering About » Pmap styled pipes — 9 February 2013 @ 16 h 34 min

RSS feed for comments on this post. TrackBack URI

Leave a comment

(c) 2014 Clojure and me | powered by WordPress with Barecity