Are pipe dreams made of promises?

unsorted — cgrand, 18 November 2009 @ 19 h 32 min

follow-up: pipe dreams aren’t necessarily made of promises

(defn pipe
 "Returns a pair: a seq (the read end) and a function (the write end).
  The function can takes either no arguments to close the pipe 
  or one argument which is appended to the seq. Read is blocking."
 []
  (let [promises (atom (repeatedly promise))
        p (second @promises)]
    [(lazy-seq @p) 
     (fn 
       ([] ;close the pipe
         (let [[a] (swap! promises #(vector (second %)))]
           (if a
             (deliver a nil) 
             (throw (Exception. "Pipe already closed")))))
       ([x] ;enqueue x
         (let [[a b] (swap! promises next)]
           (if (and a b)
             (do 
               (deliver a (cons x (lazy-seq @b)))
               x)
             (throw (Exception. "Pipe already closed"))))))]))

Beware of not printing the seq while the pipe is still open!

(let [[q f] (pipe)]
  (future (doseq [x q] (println x)) (println "that's all folks!"))
  (doseq [x (range 10)]
    (f x))
  (f)) ;close the pipe

18 Comments »

  1. Looks cool but I’m not sure what this is for.

    Comment by anonymous — 19 November 2009 @ 1 h 24 min
  2. @anonymous it creates a communication channel whose ends can then be handed to two different threads. The read end is a plain lazy seq.

    Comment by cgrand — 19 November 2009 @ 9 h 01 min
  3. Where we can read more about new developments in Clojure like promise/deliver?

    Comment by rkrastev — 19 November 2009 @ 10 h 41 min
  4. The function looks useful and the implementation very clever. However, I couldn’t quite wrap my head around the fact why you use “second” and “next” so quite often. Is this to force evaluation to move down the seq one step?

    Comment by dwerner — 30 December 2009 @ 20 h 04 min
  5. @dwerner

    I could be wrong but I’m pretty sure the call to second is required because when we enqueue we recursively shift along the unfulfilled promises with the call to (swap! promises next).

    If we handed the reader the very first item of promises then the reader would block forever because the delivery of a new list (made of the enqueued values cons’d with a lazy seq of the remaining promises) happens in the next cell.

    This does appear to mean that the very first promise is unfulfilled (and gc’d because nothing references it), but it means we need less logic in our enqueueing.

    I personally find this code to be incredibly elegant and concise… (Though it did take a while to figure out how it worked)

    Comment by Rick Moynihan — 23 February 2010 @ 14 h 06 min
  6. clojure.contrib.pipes? :)

    Comment by Greg — 18 March 2010 @ 20 h 38 min
  7. [...] of the spinning nature of atoms, it’s kind of a hack (a fun hack but still a hack) to build queues on it. Here is the same pipe function built on Java [...]

  8. My home is Texas now. However I’d like a school focusing on just that, or in theatre art type stuff (:

    Comment by foldable ladder — 28 April 2011 @ 0 h 43 min
  9. Youre so cool! I dont suppose Ive learn something like this before. So good to seek out someone with some authentic thoughts on this subject. realy thanks for starting this up. this web site is something that’s needed on the net, somebody with somewhat originality. useful job for bringing one thing new to the internet!

    Comment by cat tree — 2 May 2011 @ 5 h 59 min
  10. Note that the implementation above will OutOfMemoryError if large sets of values and/or large values are written into the pipe. Here’s a version that caps the queue that seems to do a better job:

    (defn pipe2
    “Returns a pair: a seq (the read end) and a function (the write end).
    The function can take either no arguments to close the pipe
    or one argument which is appended to the seq. Reads and writes may block.
    The (optional) argument to pipe2 is the maximum size of the queue (beyond
    that size writes will block) – default value if not provided is 100.”
    ([] (pipe2 100))
    ([size]
    (let [q (java.util.concurrent.LinkedBlockingQueue. size)
    EOQ (Object.)
    NIL (Object.)
    s (fn s [] (lazy-seq (let [x (.take q)]
    (when-not (= EOQ x)
    (cons (when-not (= NIL x) x) (s))))))]
    [(s) (fn ([] (.put q EOQ)) ([x] (.put q (or x NIL))))])))

    Test code:

    (let [[q f] (pipe2)]
    (future (doseq [x q] (println x)) (println “that’s all folks!”))
    (doseq [x (range 1000000000)] (f x))
    (f))

    Note: I’m a Clojure n00b, so this may introduce other issues (e.g. deadlock if the queue is full and the readers are also writers).

    Comment by Peter Monks — 8 January 2013 @ 7 h 56 min
  11. Doh – commented on the wrong post. Feel free to delete this comment if you’d like.

    Comment by Peter Monks — 8 January 2013 @ 7 h 57 min
  12. @Rick Moynihan

    This code is elegant indeed and a devil to figure out. After two days of studying it on and off these are my conclusions:

    1/ (second @promises) returns the second items from promised because;

    2/ [a b] (swap! promises next) returns the second and third items from promised (and not the remainder of the list as you’ve said.)

    3/ The devil is in (deliver a (cons x (lazy-seq @b))). Let’s image we add 1 2 3 4 5 … to the sequence. On the first call of the function @b would block. However, @b is wrapped by lazy-seq so the derefing block would only happen when the result of the function would be traversed, for example by doseq.

    What would the return value of (push-function 1) be? (push-function is the function returned by pipe.) The return value would be a sequence. The first item of the sequence would certainly be 1, so [1 … . But what would be the second item? Well, when the second item would be fetched b would be derefered by the @b call. If no future items have been pushed down the pipe that deref would block. Now image we call (push-function 2). This would add 2 as the second item in the sequence.

    Now this is a strange situation in Clojure land. In Clojure all sequences are immutable. The sequence being returned is in fact immutable. But, crucially, the returned sequence is only partially realized. Items can be added to the sequence without creating a new sequence object.

    After calling (push-function 2) what would be the value of the third item of the sequence? The first two items are 1 2 so [1 2 … . There is no third item yet but it will appear as soon as we call (push-function 3).

    Strange, cool and elegant all at once.

    Comment by Steven Devijver — 14 June 2013 @ 20 h 59 min
  13. AHS Skin Care Reviews

    Clojure and me ยป Are pipe dreams made of promises?

    Trackback by AHS Skin Care Reviews — 4 December 2014 @ 14 h 59 min
  14. Hello clj-me.cgrand.net admin, i see you need some fresh posts. Daily updates will rank your website in google higher, content is king nowadays. There is must have tool for every content writer, just search in google for:
    Ightsero’s Essential Tool

    Comment by BruceD — 6 August 2015 @ 13 h 13 min
  15. If you want to take a good deal frߋm t&#1211іs post then ʏօu &#1211ave t&#11423 apply these methods
    tο уοur աօn web site.

    Alsso visit mү blog post: dream ɑbout
    job interview (r.advantech.com)

    Comment by r.advantech.com — 28 October 2015 @ 3 h 32 min
  16. I have noticed you don’t monetize cgrand.net, don’t waste your traffic, you can earn additional cash every month with new monetization method.
    This is the best adsense alternative for any type of website (they
    approve all sites), for more info simply search in gooogle: murgrabia’s tools

    Comment by BestHans — 26 July 2019 @ 3 h 43 min
  17. problems? A number of my blog visitors have complained about my website not working correctly in Explorer but looks great in Opera.

    Comment by suba suba — 11 June 2020 @ 12 h 53 min
  18. Your article was the highlight of my reading list this week.

    Comment by Khalid Negm — 30 June 2024 @ 13 h 34 min

RSS feed for comments on this post. TrackBack URI

Leave a comment

(c) 2025 Clojure and me | powered by WordPress with Barecity