Wide Finder 2 : adding agents
Below is the code that allows me to make the serial code parallel without too much effort: by replacing a doseq with a pdoseq.
(<ins>p</ins>doseq line (-> (. System in) (java.io.InputStreamReader. "US-ASCII") java.io.BufferedReader. line-seq) <ins>[u-hits (merge-with +), u-bytes (merge-with +), s404s (merge-with +), clients (merge-with +), refs (merge-with +)]</ins>
This new line specifies how to merge each var that can be mutated.
The input is split into batches of 10000 lines (hardcoded, sorry) that are processed in independent threads. The results of each batch are then merged using the specified rules.
NB: merge-with
is the Clojure’s function for merging two maps. Here for each var it is specified that intermediate values must be merged as maps while adding values on key collisions.
Runtime on 10M lines on a Core 2 Duo (serial: 2m45s):
real 1m36.296s user 3m2.651s sys 0m3.536s
It doesn’t work that well on the T2000:
Serial code (for 1M lines): real 2m43.273s user 4m11.942s # user > real because of parallel GC sys 0m16.093s Parallel code (for 1M lines): real 0m37.121s user 3m59.365s sys 0m13.944s
You need to know that Clojure takes 11s to boot on this computer, so without Clojure’s boot time: we got a 15x ((3m59s – 11s)/(37s – 11s)) speedup, far from 32x. That’s because the main loop dispatching batches of lines to workers can’t keep pace.
On the whole, I’m happy with these first results: using Clojure I had been able to make the serial code run in parallel without much changes to the main logic.
The code below can’t handle the full data set (200M lines) mainly because of: memory requirements and some “malformed” log lines.
So far my best runtime on the whole dataset is 27 minutes.
To be continued…
;;;;;;;;;;; helper code (defn pfor-each "equivalent to (reduce merge-fn agent-init (map work-fn s)) but with some parallelism (several map workers but only one reducer)" [s work-fn agent-init merge-fn] (let [nagents (.. Runtime (getRuntime) (availableProcessors)) agents (map agent (replicate nagents nil)) result (agent agent-init)] ((fn [[agent & etc-agents] s] ;a fn and not a loop to be sure to not retain ;a reference to the head of the sequence (if-let [x & etc] s (do (await agent) ;awaiting agent to not flood memory under pending jobs (when-let r @agent (send result merge-fn r)) (send agent work-fn x) (recur etc-agents etc)) (do (doseq agent agents (await agent) (when-let r @agent (send result merge-fn r))) (await result) @result))) (cycle agents) s))) (defn batch "Returns a lazy sequence of lists of n items each -- the last one may have less items." [s n] (when s (lazy-cons (take n s) (batch (drop n s) n)))) (defmacro pdoseq "Like doseq except you have to specify how to merge vars mutated by the body." [item s merge-rules & body] (let [captured-vars (take-nth 2 merge-rules) mergers (take-nth 2 (drop 1 merge-rules)) init-syms (take (count captured-vars) (repeatedly gensym)) syms-a (take (count captured-vars) (repeatedly gensym)) syms-b (take (count captured-vars) (repeatedly gensym)) result-syms (take (count captured-vars) (repeatedly gensym))] `(let [[~@init-syms :as init#] [~@captured-vars] work# (fn [_# items#] (binding [~@(interleave captured-vars init-syms)] (doseq ~item items# ~@body) [~@captured-vars])) merger# (fn [[~@syms-a] [~@syms-b]] (vector ~@(map #((if (seq? %1) concat cons) %1 %&) mergers syms-a syms-b))) [~@result-syms] (pfor-each (batch ~s 10000) work# init# merger#)] ~@(map list (repeat `set!) captured-vars result-syms)))) ;;;;;;;;;;;; main code (def u-hits) (def u-bytes) (def s404s) (def clients) (def refs) (defmacro acc [h k v] `(set! ~h (assoc ~h ~k (+ (get ~h ~k 0) ~v)))) (defn top [n h] ; the previous top function wasn't a real port of Tim Bray's one (loop [top-n (replicate n (first {"-" 0})) kvs (seq h)] (if-let [[k v :as kv] & etc] kvs (if (> v (val (first top-n))) (let [[lt gt] (split-with #(< (val %) v) (rest top-n))] (recur (concat lt (cons kv gt)) etc)) (recur top-n etc)) (reverse top-n)))) (defn record [client u bytes ref] (acc u-bytes u bytes) (when (re-matches #"^/ongoing/When/\\d\\d\\dx/\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+$" u) (acc u-hits u 1) (acc clients client 1) (when-not (or (= ref "\"-\"") (re-find #"^\"http://www.tbray.org/ongoing/" ref) (acc refs (subs ref 1 (dec (count ref))) 1))))) ; lose the quotes (defn printf [#^String fmt & args] (let [f (java.util.Formatter. *out*)] (.format f (. java.util.Locale ENGLISH) fmt (to-array args)))) (defn report ([label hash] (report label hash false)) ([label hash shrink] (println (str "Top " label ":")) (let [fmt (if shrink " %9.1fM: %s\n" " %10d: %s\n")] (doseq [key val] (top 10 hash) (let [key (if (< 60 (count key)) (str (subs key 0 60) "...") key) val (if shrink (/ val 1024.0 1024.0) val)] (printf fmt val key)))) (binding [u-hits {} u-bytes {} s404s {} clients {} refs {}] (pdoseq line (-> (. System in) (java.io.InputStreamReader. "US-ASCII") java.io.BufferedReader. line-seq) [u-hits (merge-with +), u-bytes (merge-with +), s404s (merge-with +), clients (merge-with +), refs (merge-with +)] (let [f (.split #"\\s+" line)] (when (= "\"GET" (get f 5)) (let [[client u status bytes ref] (map #(get f %) [0 6 8 9 10])] (cond (= "200" status) (record client u (.parseInt Integer bytes) ref) (= "304" status) (record client u 0 ref) (= "404" status) (acc s404s u 1)))))) (print (count u-hits) "resources," (count s404s) "404s," (count clients) "clients\n\n") (report "URIs by hit" u-hits) (report "URIs by bytes" u-bytes true) (report "404s" s404s) (report "client addresses" clients) (report "referrers" refs) ) ;;; explicit exit to shut down agent threads (flush) (. System exit 0)