2024年Clojure状态调查中分享您的观点!

欢迎!请查看关于页面以获取更多关于其工作方式的信息。

+14
Java互操作
已关闭

此处提问是因为我无法在https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771上发表帖子。

我们是一家初创公司,在处理使用Java 19虚拟线程时遇到了这个synchronized块。

https://github.com/clojure/clojure/blob/4090f405466ea90bbaf3addbe41f0a6acb164dbb/src/jvm/clojure/lang/Delay.java#L35

这是一项耗时庞大的调试工作,因为-Djdk.tracePinnedThreads报告没有任何信息。

目前,我们需要重新实现一个虚拟线程友好型(非synchronizedclojure.core/delay,以及我们需要的其他许多clojure.core函数以确保虚拟线程不会被绑定。

一如既往,我对Clojure团队的成果感到惊叹!只是想突出一些API对虚拟线程的不友好性,并在可能的情况下将其优先处理,因为虚拟线程非常有用。

谢谢!

已关闭,附注:已完成在1.12.0-alpha5

2 个回答

+2
 
最佳答案

Clojure 1.12.0-alpha5现已发布,并将lazy-seq和delay的锁定方式从synchronized更改为使用锁。

0

谢谢,我确实对为虚拟线程准备Clojure感兴趣。我想知道你在delay里有什么?

很高兴听到这个好消息!最初是对HTTP响应(它本身是HttpKit返回的承诺)进行后处理。然后,为了消除一些变量,我移除了HTTP请求,添加了一个未实现的通知“假”承诺,并使`delay`体仅为`(deref p <timeout> nil)`。`<timeout>`的值较低(1-10毫秒)时,几乎从未显示任何问题。较高的值会越来越频繁地因卡住而导致“虚拟线程饥饿”。

使用`ReentrantLock`自定义`Delay`实现来覆盖`clojure.lang.Delay`修复了这个问题。很高兴分享我们的Java类——它可以在平台和虚拟线程上都通过https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj。(它还有一项优化,使得它在可能的情况下避免在`deref`上引用`volatile`值字段,而是引用未同步的一个。)
虚拟线程中的synchronized的问题主要在于synchronized存在于阻塞操作周围,因此围绕这个操作延迟处理似乎可能是个问题。我已经检查了核心中的所有同步,实际上只有几个看起来有问题(这是一个)。

"如果可能,避免在`deref`上引用`volatile`值字段,而是引用未同步的一个",这听起来像是Java中经典的并发错误(类似于双重检查锁定),但可能取决于用法。只是要记住,没有任何保证其他线程将看到对共享的非volatile、非synchronized状态的更改。
同意关于`synchronized`的观点。

对于可能出现的并发错误的观察很到位。以下是相关代码。尤其是`deref`使用了`volatile`/非同步策略。

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

  public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      // 可见性读取
      Object v = value;

      if (v instanceof PendingValue) {
        try {
          lock.lock();

          // 在持有锁后进行可见性读取,以防它已更改
          Object vAfter = value;

          if (vAfter instanceof PendingValue) {
            IFn f = (IFn)(((PendingValue)vAfter).v);

            Object vComputed = null;
            try {
              vComputed = f.invoke();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            }

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
          } else {
            return getOrThrow(vAfter);
          }
        } finally {
          lock.unlock();
        }
      } else {
        return v;
      }
    } else {
      return getOrThrow(unsynchronizedValue);
    }
  }

  ... // 在此处实现`force`, `isRealized`
}
```
by
您在对锁下的unsynchronizedValue进行编写,但读取操作不是在锁下进行的,因此无法保证其他线程确实看到了该写入操作。这是一个线程可见性问题,类似于经典的双重检查锁定模式。我不会尝试通过这种方式欺骗,但在这里使用可重入读/写锁可能是有用的。
by
您是对的——不应该期望读取器*必须*摘取对`unsynchronizedValue`的更改。话虽如此,如果线程无法抓取更改,它将回退到读取`value`(它是`volatile`的),这保证了它始终是最新的。在我们的测试中,大约有40%的读取在读取`value`之前已经摘取了`unsynchronizedValue`,在这种程度上(以及基于其他内部性能测试),优化似乎是有价值的。所有情况下`deref()`都产生了正确的结果。

`ReentrantReadWriteLock`是一个值得探索的建议——谢谢您!
by
我想了解原始问题代码做了什么。例如,“HTTP响应后的处理”本身是否在运行进一步的I/O?就像分散请求一样?
by
我们对于虚拟线程的使用案例,正如您所怀疑的那样,是用于I/O操作。(我说“曾是”,因为我们所用的虚拟线程(无论如何)太不可靠。它们会导致难以调试的死锁,并可能耗去我们整整一个月的开发时间。我们从那时起改为在大型线程池之上使用自定义的`future`宏。)我们正在进行HTTP请求的扇出操作,使用`http-kit`,接着对响应进行处理。我们需要做到所有这些,1)不阻塞调用线程,2)不耗尽`clojure.core/future`线程池。我们意识到,虽然我们可以(并确实)使用一个规避方案,但虚拟线程可以是一个很好的解决方案,所以我们转向了它。

以下是我们所做操作的基本代码
```clojure
;; 在一个库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/request request)]
    ;; 我们使用一个`delay`来保持它是一个可解引用的输出,同时避免使用线程
    (delay (post-process @response))))

;; 在一个业务逻辑函数中
;; 将在上游被`deref`
(defn async-concurrently-make-requests [requests]
  (vthread
    (->> requests
        ;; 同时启动所有请求
        (mapv #(vthread @(request-and-post-process %)))
        ;; 聚合
        (mapv deref))))
```

请注意,我们使用了`clojure.core/delay`。事实表明,`clojure.lang.Delay`在其`deref`方法中使用`synchronized`,这会锁定虚拟线程。`deref` `delay`会使调用虚拟线程等待HTTP请求和后续处理完成,在某些情况下可能需要30秒。我们在这里看到了不仅锁定行为,而且还出现了死锁,这出乎意料。(锁定是`clojure.lang.Delay`的问题;死锁是底层JVM对虚拟线程实现的 Zookeeper 实现的问题。据我所知,在bugs.openjdk.org上有一个与之相关的开放工单。)为了避免锁定,我们使用了基于`ReentrantLock`的自定义`delay`。然而,例如,一个JSON反序列化库中仍然存在其他锁定问题,这使得我们即使在增强`delay`之后仍然陷入死锁区域。

请告诉我这是否回答了您的问题!乐意进一步详细说明。
by
关于似乎是JVM中一个错误的死锁问题,它是否已在JDK 21中得到解决,或者这只是JDK 19虚拟线程预览中的一个错误?
...