2024 年 Clojure 状态调查!中分享您的想法。

欢迎!请在关于页面查看更多关于如何使用它的信息。

+14
Java 互操作性
关闭

因为无法在https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771上发布,所以在这里提出。

我们运营一家初创公司,当我们使用 Java 19 虚拟线程时遇到了这个《synchronized》代码块。

https://github.com/clojure/clojure/blob/4090f405466ea90bbaf3addbe41f0a6acb164dbb/src/jvm/clojure/lang/Delay.java#L35

这是一个需要大量工程努力和理智来调试的问题,因为 -Djdk.tracePinnedThreads 报告了确切的无用信息。

按照现状,我们正在重新实现一个虚拟线程友好的(非《synchronized》)《clojure.core/delay》,以及尽可能多的《clojure.core》函数,以确保虚拟线程不被固定。

一如既往,我惊叹于 Clojure 团队的工作!只是想突出它的一些 API 对于虚拟线程的不友好性,并尽可能地将其列为优先,因为虚拟线程非常实用。

谢谢!

关闭时注明: 在 1.12.0-alpha5 中完成

2 答案

+2
 
最佳答案

Clojure 1.12.0-alpha5 现已发布,并修改了 lazy-seq 和 delay 以使用锁而不是 synchronized。

0

谢谢,肯定对准备Clojure以支持虚拟线程感兴趣。我想知道你的延迟执行中有什么内容?

很高兴听到这个消息!最初是对HTTP响应的后处理(这本身是来自HttpKit返回的Promise)。然后,为了消除一些变量,我删除了HTTP请求,添加了一个未实现的“虚拟”Promise,并将`delay`体改为`(deref p <timeout> nil)`。``的低值——1-10毫秒——几乎从未显示任何问题。高值则会更频繁地导致由于挂起产生的“虚拟线程饥饿”。

使用`ReentrantLock`通过自定义`Delay`实现覆盖`clojure.lang.Delay`解决了这个问题。很高兴分享我们的Java类——它可以在平台和虚拟线程上通过https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj。(它还有一个优化,使其在可能的情况下避免在`deref`中对`volatile`值字段进行引用。)
在虚拟线程中,糟糕的是涉及到阻塞的`synchronized`,所以围绕这部分进行延迟似乎可能是一个问题。我已经查看核心中的所有同步,实际上只有少数几个看起来有问题(这就是一个)。

"当它可以在不安全的上下文中对值字段进行引用时,避免在`deref`中引用`volatile`值字段"听起来像是Java中经典的并发错误(类似于双重检查锁定),但具体使用中可能没问题。但请注意,没有任何保证其他线程将看到共享的非`volatile`、非`synchronized`状态的更改。
同意关于`synchronized`的观点。

关于可能的并发错误,观点良好。下面是相关代码。特别是`deref`使用了`volatile`/非同步策略

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

  public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      进行volatile读取
      Object v = value;

      if (v instanceof PendingValue) {
        try {
          lock.lock();

          锁定后进行volatile读取,以防更改
          Object vAfter = value;

          if (vAfter instanceof PendingValue) {
            IFn f = (IFn)(((PendingValue)vAfter).v);

            Object vComputed = null;
            try {
              vComputed = f.invoke();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            }

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
          } else {
            return getOrThrow(vAfter);
         
        } finally {
          lock.unlock();
        
      }
        return v;
      }
    } else {
      return getOrThrow(unsynchronizedValue);
    }
  }

  ... // `force`, `isRealized` implemented here
}
```
您在锁下写入unsynchronizedValue,但读取操作不在锁下进行,因此不能保证其他线程实际上能看到该写入。这是一个线程可见性问题,类似于经典的双检查锁定模式。我不会尝试这种方式作弊,但在这里使用ReentrantReadWriteLock可能是有用的。
您是对的——不应期望读取器*必须*获取到`unsynchronizedValue`的更改。但话又说回来,如果线程未能获取到更改,它会回退到读取`value`(它是`volatile`的),这保证了它总是最新的。在我们的测试中,大约40%的读取在读取`value`之前读取了`unsynchronizedValue`,从这个角度来看(以及基于其他内部性能测试),该优化似乎是宝贵的。在所有情况下,`deref()`都产生了正确的结果。

探索`ReentrantReadWriteLock`是一项有趣的建议——谢谢您!
我想了解原始有问题代码做了什么。"http响应后处理"本身是否运行了更多IO操作?比如扩展请求?
我们使用虚拟线程的情况,正如您所怀疑的,是I/O。我说“曾经是”,因为我们使用的虚拟线程(无论如何)太不可靠了。它们会引起死锁,调试起来令人头痛,可能花费我们整整一个月的开发时间,包括内部的时间。我们后来转向使用自定义`future`宏,在大型线程池之上。

基本上,这是我们正在进行的代码:
```clojure
;; 在库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/request request)]
    ;; 我们使用`delay`来保持它是可访问的,同时又避免使用线程
    (delay (post-process @response))))

;; 在业务逻辑函数中
;; 会在上游被解除引用
(定义函数 async-concurrently-make-requests [requests]
  (线程
    (->> requests
         ;; 并发启动所有请求
         (mapv #(vthread @(request-and-post-process %)))
         ;; 聚合
         (mapv deref))))
```

需要注意的是使用了 `clojure.core/delay`。结果是 `clojure.lang.Delay` 在其 `deref` 方法中使用 `synchronized`,这会导致虚拟线程卡住(参见 Clojure Q&A 线程)。解除 `delay` 会使调用虚拟线程等待时间与 HTTP 请求和后处理所需时间相同,在某些情况下可能长达 30 秒。我们不仅看到了卡住行为,还遇到了意外的死锁。(卡住问题是 `clojure.lang.Delay` 的问题;死锁问题似乎是底层 JVM 对虚拟线程的实现问题。据我所知,在 bugs.openjdk.org 上有一个与此相关的问题跟踪。为了避免卡住,我们使用了自定义的基于 `ReentrantLock` 的 `delay`。然而,在其他库中,如JSON反序列化库中,似乎仍然存在卡住,尽管我们对 `delay` 进行了增强。

请告诉我这些是否回答了您的问题!很高兴进一步说明。
by
关于似乎在 JVM 中的死锁问题,这在 JDK 21 中是否已解决,还是这只是 JDK 19 预览版虚拟线程中的一个bug?
...