Clojure 2024状态调查!中分享您的想法。

欢迎!请参阅关于页面获取更多关于此如何工作的信息。

+14
Java 互操作
已关闭

在此提问,因为我无法在https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771上发布。

我们在使用 Java 19 的虚拟线程时遇到了这个 synchronized 块。

https://github.com/clojure/clojure/blob/4090f405466ea90bbaf3addbe41f0a6acb164dbb/src/jvm/clojure/lang/Delay.java#L35

这是一个需要大量工程努力和理智去调试的问题,因为 -Djdk.tracePinnedThreads 杰不过报告了确切nothingnothing。

目前,我们需要重新实现一个虚拟线程友好的(非synchronizedclojure.core/delay,以及我们所需要的尽可能多的clojure.core函数以确保虚拟线程不会固定。

就像往常一样,我被 Clojure 团队的作品所震撼!只想强调其中一些 API 的虚拟线程不友好,如果可能的话,请将其优先处理,因为虚拟线程非常有用。

谢谢!

已关闭,备注: 在 1.12.0-alpha5 中完成

2 答案

+2
 
最佳答案

Clojure 1.12.0-alpha5 已经可用,并修改了 lazy-seq 和 delay 以使用锁而不是 synchronized。

0

谢谢,我肯定对准备Clojure虚拟线程感兴趣。我很想知道你在延迟里有什么?

很高兴听到这个消息!最初是在处理HTTP响应(这本身是一个来自HttpKit的promise)。为了消除一些变量,我移除了HTTP请求,添加了一个未经实现的“哑”promise,并将其`delay`体简化为`(deref p <timeout> nil)`。`<timeout>`的值较低——1-10毫秒——几乎很少出现问题。值越高,越容易由于“虚拟线程饥饿”而导致问题。

使用`ReentrantLock`自定义`Delay`实现来重写`clojure.lang.Delay`解决了问题。很高兴分享我们的Java类——它可以在平台和虚拟线程上通过https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj。(它还有一些优化,可以避免在`deref`中引用`volatile`值字段,如果可以引用非同步的一个的话。)
关于虚拟线程中的`synchronized`最容易出问题的,是因为它会围绕阻塞进行同步,所以围绕这个进行延迟似乎是一个可能的问题。我已经查看核心的所有同步项,但只有少数几个看起来有潜在问题(这是其中之一)。

“如果在可能的情况下引用未同步的字段,则避免在`deref`上引用`volatile`值字段”听起来像是Java中的典型并发错误(类似于双检查锁定),但也可能根据使用情况是可接受的。只需记住,没有保证任何其他线程都能看到对共享的非volatile、非同步状态的更改。
同意关于`synchronized`的看法。

关于可能的并发错误的观察很好。以下是相关代码。特别是`deref`使用了`volatile`/非同步策略

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

  public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      // Volatile read
      Object v = value;

      if (v instanceof PendingValue) {
        try {
          lock.lock();

          // Volatile read after lock held, in case it changed
          Object vAfter = value;

          if (vAfter instanceof PendingValue) {
            IFn f = (IFn Zeitpunkt ((PendingValue) v).);

            Object vComputed = null;
            try {
              vComputed = f.call();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            }

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
          } else {
            return getOrThrow(vAfter);
          }
        } finally {
          lock.unlock();
        }
      } else {
        return v;
      }
    } else {
      return getOrThrow(unsynchronizedValue);
    }
  }

  ... // `force`, `isRealized` implemented here
}
```
您在锁下写入unsynchronizedValue,但读取操作不在锁下,因此不能保证其他线程真正看到了那次写入。这是一个线程可见性问题,类似于经典的双重检查锁定模式的问题。我不会尝试以这种方式作弊,但在这里使用ReentrantReadWriteLock可能是有用的。
by
您是对的——不应该期望读取器*必须*捕捉到`unsynchronizedValue`的改变。不过,如果某个线程没有捕捉到这个改变,它将回退到读取`value`(它是`volatile`类型的),这是保证始终是最新的。在我们的测试中,大约有40%的读取在读取`value`之前先捕捉到了`unsynchronizedValue`,所以在一定程度上(以及基于其他内部性能测试)这个优化似乎是很有价值的。在任何情况下,`deref()`都返回了正确的结果。

“ReentrantReadWriteLock”是一个有趣的探索建议——感谢您!
by
我想了解原来的有问题的代码做了什么。‘对HTTP响应的后续处理’本身是否运行了更多的IO?比如分发请求?
by
我们使用虚拟线程的场景,正如您所怀疑的,是用来处理I/O的。(我说“之前”的原因是,我们使用的虚拟线程——无论如何——太不可靠了。它们会导致很难调试的死锁,并可能耗费我们整整一个月的开发时间。我们后来转而使用基于大线程池上的自定义`future`宏。)我们使用`http-kit`进行HTTP请求的分发,并对响应进行后处理。我们需要做到:1)不阻塞调用线程;2)不耗尽`clojure.core/future`线程池。我们认为尽管我们可以(并且确实)使用一个解决方案,但是虚拟线程可能是一个好的解决方案,因此我们转向了它。

基本上,这是我们进行的工作的代码
```clojure
;;在一个库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/request request)]
    ;;我们使用一个`delay`来保持它是可展开的输出,同时避免使用线程
    延迟(后处理 @response)

;; 在业务逻辑函数中
;; 将会进行 `deref`操作
(defn async-concurrently-make-requests [requests]
  (vthread
    (->> requests
         ;; 并发启动所有请求
         (mapv #(vthread @(request-and-post-process %)))
         ;; 聚合
         (mapv deref))))
```

注意使用 `clojure.core/delay`。结果发现 `clojure.lang.Delay` 在其 `deref` 方法中使用了 `synchronized`,这会锁定虚拟线程。`deref` `delay` 会使调用虚拟线程等待 HTTP 请求和后处理运行的时间,在某些情况下可能需要 30 秒。我们不仅看到了锁定行为,还看到了死锁,这是预料之外的。(锁定是 `clojure.lang.Delay` 的问题;根据我的判断,死锁是虚拟线程的底层 JVM 实现的问题。我相信在 bugs.openjdk.org 上有一个关于它的开放工单。)为了避免锁定,我们使用了我们自定义的基于 `ReentrantLock` 的 `delay`。然而,由于 JSON 反序列化库中的其他锁定问题,即便我们增强了 `delay`,仍然陷入了死锁区域。

如果这些都回答了您的问题,请告诉我!我很乐意进一步解释。
by
关于看似 JVM 的死锁问题,这在 JDK 21 中已经解决了吗,或者这仅仅是 JDK 19 虚拟线程预览中的缺陷?
...