请分享您的想法,参加2024 Clojure发展调查!

欢迎!请查看关于页面获取更多关于它是如何工作的信息。

0
序列
编辑

我本以为下面的操作会在所有可用的CPU(在此情况为12个)上并行化,但它只在一个上运行。我做错了什么?

(ns csv2summap.core
  (:require [clojure.data.csv :as csv]
            [clojure.java.io :as io]
            [clojure.core.reducers :as r])))

(with-open [writer (io/writer "numbers.csv")]
  (csv/write-csv
   writer
   (take 10000000
         (repeatedly #(vector (char (+ 65 (rand-int 26))) (rand-int 1000))))))

(defn sum-vals
([] {})
([m [k v]]
 (update m k (fnil + 0) (Integer/parseInt v))))

(defn merge-sums
([] {})
([& m] (apply merge-with + m)))

(time
(with-open [reader (io/reader "numbers.csv")]
  (doall
   (r/fold
    (/ 10000000 12)
    merge-sums
    sum-vals
    (csv/read-csv reader)))))
(def n-cpu (.availableProcessors (Runtime/getRuntime)))
=>12
(ns csv2summap.core
  (:require [clojure.data.csv :as csv]
            [clojure.java.io :as io]
            [clojure.core.reducers :as r])))

-

(defproject csv2summap "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
            :url "https://www.eclipse.org/legal/epl-2.0/"}
  :dependencies [[org.clojure/clojure "1.10.1"]
                 [org.clojure/data.csv "1.0.0"]]
  :repl-options {:init-ns csv2summap.core})

2 答案

+1

选定
 
最佳答案

感谢来自Clojurians聊天室的Sean Corfield和Adrian Smith,我现在了解到r/fold需要可折叠序列才能并行运行。

具体来说,你需要实现 CollFold。默认情况下,向量和一些其他类型(range、iterate、hashmap)已经实现了它,但默认回退是使用串行化简。因此,简单来说,可以使用向量,稍微复杂一点的是定义自己的可折叠子程序或者实现 CollFold(使用 reducer 接口或 reify)。
0

好问题!对此类数据的有效操作非常重要。

为此,我们开发了一些相关的库。

这个片段显示了一些简化处理 csv 数据的方法,同时也比较了你所执行的求和/合并操作的一个简单方案和一个感知数据集/数据类型的并行版本(不出所料,后者要快得多)。

https://gist.github.com/harold/7335b78606f8e962f2b385f1ed79d15c

希望这有助于你,并展示了处理此类问题的不同方法。

PS. 从你的问题中学到了关于 fnil 的知识,很酷!

感谢你的详尽回答。创建数据集似乎花费了大部分时间(约 82 秒从 1 亿行),而处理本身非常快 - 1 亿行用了约 10 秒。

编辑了
当然,欢迎!是的,100M行确实是很多!解析200M个数字肯定要花费一些时间。你的问题是在于合并/求和操作的并行化,因此我的代码将这一部分的工作单独处理。如果你想优化数据集的保存和加载,通常`nippy`比CSV快得多。请查看`tech.io/put-nippy!`和`tech.io/get-nippy`。
谢谢。尝试使用`(ds/write-csv! (ds/->dataset source-data))`或`(io/put-nippy! "./data.nippy" (ds/->dataset source-data))`在100M行上生产数据导致我的机器的堆分配失败。我使用原始代码生成了csv(并手动添加了标题)。
明白了。这很有道理,并且是`clojure.data.csv`流式处理能力的一个精彩例子。这台机器有32GB的RAM,所以我能够修改我的代码以写出行为100M的CSV和nippy数据集。磁盘上的csv大约是~658MB,nippy大约是~537MB。csv数据集加载需要~47秒,而nippy数据集加载需要~2.8秒。
...