2024 Clojure 现状调查中分享您的想法!

欢迎!请查看关于页面以了解更多关于这个网站如何运作的信息。

+14
Java 互操作
已关闭

在此提问是因为我无法在https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771上发帖。

我运营一家初创公司,我们在使用 Java 19 虚拟线程时遇到了这个synchronized块问题。

源代码链接

由于-Djdk.tracePinnedThreads选项没有任何报告,因此调试这一过程耗费了大量工程力量和耐心。

目前,我们需要重新实现一个虚拟线程友好的(非synchronizedclojure.core/delay以及我们需要的尽可能多的clojure.core函数,以确保虚拟线程不会绑定。

一如既往,我被Clojure团队的干劲所折服!只是想强调一些API在虚拟线程方面的不友好性,并希望在可能的情况下将其优先排序,因为虚拟线程非常有用。

谢谢!

随注:在1.12.0-alpha5中完成。

2 答案

+2
 
最佳答案

Clojure 1.12.0-alpha5已发布,修改了lazy-seq和delay,使用锁代替synchronized。

0

谢谢你,我肯定有兴趣准备Clojure以支持虚拟线程。我很想知道你的delay中有什么。

很高兴听到这个!一开始这是对HTTP响应的后处理(它本身是HttpKit返回的承诺)。然后为了消除一些变量,我移除了HTTP请求,添加了一个未实现的“虚拟”承诺,并将`delay`主体简单地改为`(deref p <timeout> nil)`。较低的`<timeout>`值(1-10 ms)几乎从未显示任何问题。较高的值会更频繁地导致由于“虚拟线程饥饿”而出现的错误。

使用`ReentrantLock`重写`clojure.lang.Delay`的`Delay`自定义实现解决了这个问题。很高兴分享我们的Java类 —— 它在平台和虚拟线程上都通过了测试https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj。(它还有一个优化,使其避免如果可以引用未同步的值的话,`deref`对`volatile`值字段的引用。)
在虚拟线程中,synchronized的主要问题是围绕阻塞的synchronized,因此在延迟这一点上肯定是一个可能出现的问题。我已经检查了核心中的所有同步性,实际上只有少数几个似乎存在问题(这个就是一个)。

“在可能的情况下,避免在`deref`上引用`volatile`值字段,而应引用未同步的一个”听起来像Java中的经典并发错误(类似于双重检查锁定),但根据使用情况可能仍然可以。只需记住,没有任何保证其他线程会看到共享的未volatile和非synchronized状态的更改。
by
同意关于`synchronized`的观点。

关于可能出现并发错误的观察很棒。以下是相关的代码。特别是`deref`使用了`volatile`/非同步策略

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

 public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      // Volatile read
      Object v = value;

      if (v instanceof PendingValue) {
        try {
          lock.lock();

          // Volatile read after lock held, in case it changed
          Object vAfter = value;

          if (vAfter instanceof PendingValue) {
            IFn f = (IFn)(((PendingValue)vAfter).v);

            Object vComputed = null;
            try {
              vComputed = f.invoke();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            }

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
          return getOrThrow(vAfter);
        
        finally {
          lock.unlock();
       
       }
       }
       return v;
  
    } else {
  return getOrThrow(unsynchronizedValue);
    }
  }

  ... // `force`, `isRealized` 在此处实现
}
```
by
您在锁下写入unsynchronizedValue,但读取操作不在锁下进行,因此无法保证其他线程实际上能看到这个写入操作。这是一个与经典双重检查锁定模式相似的线程可见性问题。我不建议以这种方式作弊,但在这里使用ReentrantReadWriteLock可能是有用的。
by
您是对的——不应该期望读者“必须”获取对`unsynchronizedValue`的更改。话虽如此,如果一个线程未能获取更改,它将回退到读取`value`(该值是`volatile`),这始终是保证最新的。在我们的测试中,大约40%的读取操作在读取`value`之前检索了`unsynchronizedValue`,因此从这个意义上说(以及基于其他内部性能测试),这种优化似乎是有价值的。在所有情况下,`deref()`都产生了正确的结果。

ReentrantReadWriteLock是一个有趣的建议,谢谢您!
by
我想了解原始有问题代码是如何工作的。“对http响应的后续处理”本身是否运行了进一步的IO?比如分散请求?
by
我们的虚拟线程的使用案例,如您所怀疑的,是I/O。(我说“是”,因为我们使用的虚拟线程太不可靠了。它会引发可能导致调试惩罚的死锁,并可能耗费我们整个月的时间,包括所有包含在内的开发时间。我们之后转而使用在大型线程池之上的自定义`future`宏。)我们在使用`http-kit`进行HTTP请求分散和响应的后续处理。我们需要所有这些都1)不阻塞调用线程,2)不耗尽`clojure.core/future`线程池。我们猜测,尽管我们可以(确实也可以)使用一个解决方案,但虚拟线程可以是一个很好的解决方案,所以我们寻求它。

本质上,这是我们所进行的代码
```clojure
;; 在库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/request request)]
    ;; 我们使用`delay`来保持其可解析的输出,同时避免使用线程
    (delay (post-process @response))))

;; 在业务逻辑函数中
  ; 将被`deref`
(defn async-concurrently-make-requests [requests]
  (vthread
    (->> requests
        ;; 并发启动所有请求
         (mapv #(vthread @(request-and-post-process %)))
         ;; 聚合
         (mapv deref))))
```

请注意使用了`clojure.core/delay`。事实上,`clojure.lang.Delay`在它的`deref`方法中使用`synchronized`,这会使虚拟线程挂起。当`deref`一个`delay`时,调用虚拟线程会等待HTTP请求和后处理完成,在某些情况下这可能需要30秒。我们不仅看到了挂起行为,而且还遇到了意料之外的死锁。 (挂起是`clojure.lang.Delay`的问题;死锁可能是底层JVM虚拟线程实现在虚拟线程上的问题。据我所知,在bugs.openjdk.org上有一个开放的票证。) 为了避免挂起,我们使用了基于自定义`ReentrantLock`的`delay`。然而,还有其他挂起,比如在JSON反序列化库中,即使在`delay`增强之后,这仍然使我们陷入了死锁区域。

如果你觉得这些都回答了你的问题,请告诉我!我很乐意进一步阐述。
关于似乎在JVM中存在的死锁问题,它在JDK 21中已经解决了吗?还是这只是JDK 19虚拟线程预览版中的bug?
...