2024 Clojure状态调查!中分享你的想法。

欢迎!请参阅关于页面以了解更多信息。

+14
Java 互操作
已关闭

我在这里提问是因为我无法在https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771上发表帖子。

我运营一家初创公司,在试用Java 19虚拟线程时遇到了这个synchronized块的问题。

https://github.com/clojure/clojure/blob/4090f405466ea90bbaf3addbe41f0a6acb164dbb/src/jvm/clojure/lang/Delay.java#L35

调试这个问题花了很多工程力量和耐心,因为-Djdk.tracePinnedThreads报告没有任何信息。

目前,我们需要重新实现一个虚拟线程友好的(非synchronizedclojure.core/delay,以及我们需要确保虚拟线程不被固定的尽可能多的clojure.core函数。

和往常一样,我非常敬佩Clojure团队的工作!只是想强调一些API对虚拟线程的不友好性,并希望在可能的情况下将其优先处理,因为虚拟线程非常有用。

谢谢!

已关闭,附言:1.12.0-alpha5中完成

2个答案

+2
 
最佳答案

Clojure 1.12.0-alpha5版本现已可用,修改了lazy-seq和delay以便使用锁代替synchronized。

0
by

谢谢,肯定会为虚拟线程准备Clojure。我很想知道你 delay 中有什么?

by
很高兴听到这个消息!最初是对HTTP响应的_postprocessing(本身是HttpKit返回的一个promise)。然后为了消除一些变量,我去掉了HTTP请求,添加了一个非实现“dummy”promise,使`delay`体简化为`(deref p <timeout> nil)`。`<timeout>`的值 rất thấp——1-10毫秒——几乎从未显示任何问题。更高的值会越来越频繁地由于挂起而出现“虚拟线程饥饿”。

使用`ReentrantLock`实现的`clojure.lang.Delay`覆盖自定义`Delay`实现解决了问题。很高兴分享我们所拥有的Java类——它通过 https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj 在平台和虚拟线程上都通过了测试。(它还有一个优化,使它能在可能的情况下避免在`deref`上引用`volatile`值字段。)
by
关于虚拟线程中的synchronized,最主要的问题是它是在阻塞的情况下同步的,所以围绕这个延迟可能有潜在的问题。我已经查看核心中的所有同步,实际上只有少数几个看起来有问题(这是其中一个)。

"如果可以引用非synchronized的,则避免在`deref`上引用`volatile`值字段"听起来像Java中一个经典的并发错误(类似于双重检查锁定),但根据使用情况可能仍然可以。只是请记住,没有任何保证任何其他线程将看到对共享的非volatile、非synchronized状态的更改。
by
同意关于 `synchronized` 的观点。

关于可能的并发错误的观察很到位。以下是相关代码。特别是 `deref` 使用了 `volatile`/非同步策略

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

  public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      // Volatile read
      Object v = value;

      if (v instanceof PendingValue) {
        try {
          lock.lock();

          // Volatile read after lock held, in case it changed
          Object vAfter = value;

          if (vAfter instanceof PendingValue) {
            IFn f = (IFn)(((PendingValue)vAfter).v);

            Object vComputed = null;
            try {
              vComputed = f.invoke();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            }

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
          } else {
            return getOrThrow(vAfter);
          }
        } finally {
          lock.unlock();
       }
      } else {
        return v;
      }
    } else {
      return getOrThrow(unsynchronizedValue);
    }
  }

  ... // `force`, `isRealized` 实现
}
```
您在锁下写入了unsynchronizedValue,但是读操作没有在锁下进行,因此没有保证其他线程实际上能看到这次写操作。这是一个与经典的双重检查锁定模式相似的线程可见性问题。我不建议以这种方式作弊,但是在这里使用ReentrantReadWriteLock可能是有用的。
您说得对 —— 没有理由期望读取者必须检索`unsynchronizedValue`的变化。话虽如此,如果一个线程未能捡起变化,它将回退到读取`value`(这是一个`volatile`变量),这保证了其始终是最新的。在我们的测试中,约有40%的读取在读取`value`之前检测到了`unsynchronizedValue`的变化,从这一角度来看(以及基于其他内部性能测试),这种优化似乎是有价值的。在所有情况下,`deref()`都产生了正确的结果。

`ReentrantReadWriteLock`是一个有趣的建议,值得探索 —— 感谢您!
我想了解原始有问题的代码做了什么。"HTTP响应的后处理"自身是否在执行更多的I/O操作?例如扇出请求?
我们使用虚拟线程的场景,正如你所怀疑的那样,是I/O。我说“以前”,是因为我们使用的虚拟线程(至少在我们使用它们的时候)太不可靠。它们会造成难以调试的死锁,并可能耗费我们大约一个月的开发时间,包括所有内容。我们后来转向在大型线程池之上使用自定义的`future`宏。)我们在使用`http-kit`进行HTTP请求扇出,并对响应进行后处理。我们需要做到1) 在不阻塞调用线程的情况下2) 不耗尽`clojure.core/future`线程池。我们认为,虽然我们可以(并且确实)使用一个解决方案,虚拟线程可以是一个很好的解决方案,所以我们转向它。

这里是我们所做的主要代码示例
```clojure
;; 笔记库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/request request)]
    ;; 使用`delay`来保持它是可展开的输出,同时避免使用线程
    (delay (post-process @response))))

;; 业务逻辑函数中
;; 将在上游`deref`
(defn async-concurrently-make-requests [requests]
  (vthread
    (->> requests
         ;; 同时启动所有请求
         (mapv #(vthread @(request-and-post-process %)))
         ;; 聚合
         (mapv deref))))
```

请注意,这里使用了`clojure.core/delay`。事实证明,`clojure.lang.Delay`在它的`deref`方法中使用了`synchronized`(请参见Clojure问答线程),这会锁定虚拟线程。`deref` `delay`使得调用虚拟线程等待的时间与HTTP请求和后处理所需的时间一样长,在某些情况下可达30秒。我们在这里看到的不仅是锁定行为,还有死锁,这是出乎意料的。(锁定是一个`clojure.lang.Delay`的问题;死锁是虚拟线程底层JVM实现的问题,至少据我所知。我相信在bugs.openjdk.org上有一个为其开的bug ticket。)为了避免锁定,我们使用了自定义的基于`ReentrantLock`的`delay`。然而,在其他地方,例如一个JSON反序列化库中,也有其他锁定行为,尽管添加了`delay`增强,但似乎仍然使我们的系统进入死锁状态。

如果你对这些问题有答案,请告诉我!我很愿意进一步阐述。
by
关于似乎是在JVM中存在的一个bug的死锁问题,这是否在JDK 21中已经解决,或者说这只是JDK 19虚拟线程预览版中的一个bug?
...