Java 虚拟线程

本文翻译自 Virtual Threads,版权归原作者所有。

为什么需要虚拟线程?

Java 1.0 于 1995 年发布时,其 API 中包含约一百个类,其中就有 java.lang.Thread。Java 是第一个直接支持并发编程的主流编程语言。

从 Java 1.2 开始,每个 Java 线程都运行在底层操作系统提供的平台线程上。(在 Java 1.1 及之前的版本中,在某些平台上,所有 Java 线程由单个平台线程执行。)

平台线程的成本不低。启动一个平台线程需要数千条 CPU 指令,并且会消耗几兆字节的内存。服务器应用程序可能需要处理大量并发请求,为每个请求分配一个独立的平台线程变得不可行。在典型的服务器应用中,这些请求大部分时间都处于阻塞状态,等待数据库或其他服务返回结果。

提高吞吐量的传统方法是使用非阻塞 API。程序员不是等待结果,而是指定当结果可用时应调用哪个方法,以及在失败时调用另一个方法。这种方式很快就会变得令人不快,因为回调会不断嵌套。

JEP 425 在 Java 19 中引入了虚拟线程。多个虚拟线程运行在一个平台线程上。每当虚拟线程阻塞时,它就会被卸载,平台线程随即运行另一个虚拟线程。(“虚拟线程"这个名称类似于虚拟内存映射到实际 RAM。)虚拟线程在 Java 20 中成为预览特性(JEP 436),并在 Java 21 中正式发布。

有了虚拟线程,阻塞的成本变得很低。当结果不能立即获得时,你只需在虚拟线程中阻塞即可。你可以使用熟悉的编程结构——分支、循环、try 块——而不是一系列回调。

虚拟线程在并发任务数量庞大且任务主要阻塞在网络 I/O 上时非常有用。对于 CPU 密集型任务,它们没有任何优势。对于此类任务,可以考虑使用并行流递归 fork-join 任务

创建虚拟线程

工厂方法 Executors.newVirtualThreadPerTaskExecutor() 返回一个 ExecutorService,它会在单独的虚拟线程中运行每个任务。例如:

import java.util.concurrent.*;

public class VirtualThreadDemo {
    public static void main(String[] args) {
        final int NTASKS = 100;
        ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
        for (int i = 0; i < NTASKS; i++) {
            service.submit(() -> {
                long id = Thread.currentThread().threadId();
                LockSupport.parkNanos(1_000_000_000);
                IO.println(id);
            });
        }
        service.close();
    }
}

顺便说一下,这段代码使用 LockSupport.parkNanos 而不是 Thread.sleep,这样我们就不必捕获麻烦的 InterruptedException

你可能在使用需要线程工厂的底层 API。要获取虚拟线程的工厂,请使用新的 Thread.Builder 类:

Thread.Builder builder = Thread.ofVirtual().name("request-", 1);
ThreadFactory factory = builder.factory();

现在,调用 factory.newThread(myRunnable) 会创建一个新的(未启动的)虚拟线程。name 方法配置构建器以设置线程名称 request-1request-2 等。

你也可以使用构建器创建单个虚拟线程:

Thread t = builder.unstarted(myRunnable);

或者,如果你想立即启动线程:

Thread t = builder.started(myRunnable);

最后,对于快速演示,有一个便捷方法:

Thread t = Thread.startVirtualThread(myRunnable);

请注意,只有第一种方法(使用执行器服务)适用于返回结果的任务(callables)。

Thread API 的变化

在经过一系列不同 API 的实验后,Java 虚拟线程的设计者决定简单地重用熟悉的 Thread API。虚拟线程是 Thread 的实例。取消操作与平台线程相同,通过调用 interrupt。与以往一样,线程代码必须检查"中断"标志或调用会检查它的方法。(大多数阻塞方法都会检查。)

有一些区别。特别是,所有虚拟线程:

  • 属于单个线程组
  • 优先级为 NORM_PRIORITY
  • 都是守护线程

没有 API 可以构造具有其他线程组的虚拟线程。尝试在虚拟线程上调用 setPrioritysetDaemon 不会产生任何效果。

静态方法 Thread::getAllStackTraces 返回所有平台线程的堆栈跟踪映射。虚拟线程不包括在内。

新的 Thread::isVirtual 实例方法可以判断线程是否为虚拟线程。

请注意,无法找到虚拟线程在哪个平台线程上执行。

Java 19 对 Thread API 进行了一些与虚拟线程无关的更改:

  • 现在有实例方法 join(Duration)sleep(Duration)
  • 非 final 的 getId 方法已被弃用,因为有人可能重写它以返回线程 ID 以外的内容。请改用 final 的 threadId 方法。

从 Java 20 开始,stopsuspendresume 方法会对平台线程和虚拟线程抛出 UnsupportedOperationException。这些方法自 Java 1.2 起就已弃用,并自 Java 18 起标记为待删除。

捕获任务结果

你经常需要组合多个并发任务的结果:

Future<T1> f1 = service.submit(callable1);
Future<T2> f2 = service.submit(callable2);
result = combine(f1.get(), f2.get());

在虚拟线程出现之前,你可能会对阻塞的 get 调用感到不安。但现在阻塞的成本很低。下面是一个更具体的示例程序:

import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;

public class VirtualThreadDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
        Future<String> f1 = service.submit(() -> get("https://horstmann.com/random/adjective"));
        Future<String> f2 = service.submit(() -> get("https://horstmann.com/random/noun"));
        String result = f1.get() + " " + f2.get();
        IO.println(result);
        service.close();
    }

    private static HttpClient client = HttpClient.newHttpClient();

    public static String get(String url) {
        try {
            var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
            return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
        } catch (Exception ex) {
            var rex = new RuntimeException();
            rex.initCause(ex);
            throw rex;
        }
    }
}

如果你有一个具有相同结果类型的任务列表,可以使用 invokeAll 方法,然后对每个 Future 调用 get

List<Callable<T>> callables = ...;
List<T> results = new ArrayList<>();
for (Future<T> f : service.invokeAll(callables))
    results.add(f.get());

同样,这里有一个更具体的示例程序:

import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;

public class VirtualThreadDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
        List<Callable<String>> callables = new ArrayList<>();
        final int ADJECTIVES = 4;
        for (int i = 1; i <= ADJECTIVES; i++)
            callables.add(() -> get("https://horstmann.com/random/adjective"));
        callables.add(() -> get("https://horstmann.com/random/noun"));
        List<String> results = new ArrayList<>();
        for (Future<String> f : service.invokeAll(callables))
            results.add(f.get());
        IO.println(String.join(" ", results));
        service.close();
    }

    private static HttpClient client = HttpClient.newHttpClient();

    public static String get(String url) {
        try {
            var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
            return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
        } catch (Exception ex) {
            var rex = new RuntimeException();
            rex.initCause(ex);
            throw rex;
        }
    }
}

限流

虚拟线程提高了应用程序的吞吐量,因为你可以拥有比平台线程更多的并发任务。这可能会对任务调用的服务造成压力。例如,Web 服务可能无法承受大量并发请求。

对于平台线程,一个简单(尽管粗糙)的调优因素是这些任务的线程池大小。但你不应该池化虚拟线程。在虚拟线程上调度任务,然后在平台线程上调度这些虚拟线程显然是低效的。那么好处是什么呢?将虚拟线程数量限制为服务可以容忍的少量并发请求?那你为什么还要使用虚拟线程呢?

对于虚拟线程,你应该使用替代机制来控制对有限资源的访问。不要对并发任务进行整体限制,而是以适当的方式保护每个资源。对于数据库连接,连接池可能已经做了正确的事情。访问 Web 服务时,你了解自己的服务,可以提供适当的限流。

举个例子,在我的个人网站上,我提供了生成随机项目的演示服务。如果短时间内来自同一 IP 地址的大量请求,托管公司会将该 IP 地址列入黑名单。

以下示例程序展示了使用简单信号量进行限流,允许少量并发请求。当超过最大值时,acquire 方法会阻塞,但这没关系。使用虚拟线程,阻塞的成本很低。

import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;

public class RateLimitDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
        List<Future<String>> futures = new ArrayList<>();
        final int TASKS = 250;
        for (int i = 1; i <= TASKS; i++)
            futures.add(service.submit(() -> get("https://horstmann.com/random/word")));
        for (Future<String> f : futures)
            IO.print(f.get() + " ");
        IO.println();
        service.close();
    }

    private static HttpClient client = HttpClient.newHttpClient();
    private static final Semaphore SEMAPHORE = new Semaphore(20);

    public static String get(String url) {
        try {
            var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
            SEMAPHORE.acquire();
            try {
                Thread.sleep(100);
                return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
            } finally {
                SEMAPHORE.release();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            var rex = new RuntimeException();
            rex.initCause(ex);
            throw rex;
        }
    }
}

固定

虚拟线程调度器将虚拟线程挂载到载体线程上。默认情况下,载体线程的数量与 CPU 核心数相同。你可以使用 jdk.virtualThreadScheduler.parallelism VM 选项调整该数量。

当虚拟线程执行阻塞操作时,它应该从其载体线程上卸载,然后载体线程可以执行另一个虚拟线程。然而,在某些情况下,这种卸载是不可能的。在某些情况下,虚拟线程调度器会通过启动另一个载体线程来补偿。例如,在 JDK 21 中,许多文件 I/O 操作和调用 Object.wait 时会发生这种情况。你可以使用 jdk.virtualThreadScheduler.maxPoolSize VM 选项控制载体线程的最大数量。

在以下两种情况下,线程被称为固定

  • 执行 synchronized 方法或代码块时
  • 调用本地方法或外部函数时

固定本身并不坏。但是当固定的线程阻塞时,它无法被卸载。载体线程被阻塞,并且在 Java 21 中,不会启动额外的载体线程。这会减少运行虚拟线程的载体线程。

如果 synchronized 用于避免内存操作中的竞态条件,固定是无害的。但是,如果有阻塞调用,最好将 synchronized 替换为 ReentrantLock。当然,这只有在你能控制源代码时才可行。

要查找固定的线程是否被阻塞,请使用以下选项之一启动 JVM:

-Djdk.tracePinnedThreads=short -Djdk.tracePinnedThreads=full

你会得到一个堆栈跟踪,显示固定的线程何时阻塞:

... org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) <== monitors:1
...

请注意,每个固定位置只会收到一次警告!

或者,使用 Java Flight Recorder 记录,使用你喜欢的任务控制查看器查看,并查找 VirtualThreadPinnedVirtualThreadSubmitFailed 事件。

JVM 最终将实现 synchronized 方法或代码块不再导致固定。那时你只需要担心本地代码的固定。

以下示例程序展示了固定的实际情况。我们启动多个在同步方法中休眠的虚拟线程,阻塞它们的载体线程。添加了许多不执行任何工作的虚拟线程。但它们无法被调度,因为载体线程池已经完全耗尽。请注意,当你执行以下操作时,问题会消失:

  • 使用 ReentrantLock
  • 不使用虚拟线程
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class PinningDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
        // Executors.newCachedThreadPool();
        final int TASKS = 20;
        long start = System.nanoTime();
        for (int i = 1; i <= TASKS; i++) {
            service.submit(() -> block());
            // service.submit(() -> rblock());
        }
        for (int i = 1; i <= TASKS; i++) {
            service.submit(() -> noblock());
        }
        service.close();
        long end = System.nanoTime();
        System.out.printf("%.2f%n", (end - start) * 1E-9);
    }

    public static synchronized void block() {
        IO.println("Entering block " + Thread.currentThread());
        LockSupport.parkNanos(1_000_000_000);
        IO.println("Exiting block " + Thread.currentThread());
    }

    private static Lock lock = new ReentrantLock();

    public static void rblock() {
        lock.lock();
        try {
            IO.println("Entering rblock " + Thread.currentThread());
            LockSupport.parkNanos(1_000_000_000);
            IO.println("Exiting rblock " + Thread.currentThread());
        } finally {
            lock.unlock();
        }
    }

    public static void noblock() {
        IO.println("Entering noblock " + Thread.currentThread());
        LockSupport.parkNanos(1_000_000_000);
        IO.println("Exiting noblock " + Thread.currentThread());
    }
}

线程本地变量

线程本地变量是一个对象,其 getset 方法访问的值取决于当前线程。为什么你会想要这样的东西而不是使用全局或局部变量呢?经典的应用场景是非线程安全的服务,例如 SimpleDateFormat,或者会遭受竞争的服务,例如随机数生成器。每个线程的实例可以比受锁保护的全局实例性能更好。

线程本地变量的另一个常见用途是提供"隐式"上下文,例如为每个任务正确配置的数据库连接。任务代码不需要将上下文从一个方法传递到另一个方法,而是在需要访问数据库时简单地读取线程本地变量。

迁移到虚拟线程时,线程本地变量可能会成为问题。虚拟线程可能远多于线程池中的线程,现在你有更多的线程本地实例。在这种情况下,你应该重新考虑共享策略。

要在你的应用中定位线程本地变量的使用,请使用 VM 标志 jdk.traceVirtualThreadLocals 运行。当虚拟线程更改线程本地变量时,你会得到一个堆栈跟踪。

总结

  • 当你有许多主要阻塞在网络 I/O 上的任务时,使用虚拟线程来提高吞吐量
  • 主要好处是熟悉的"同步"编程风格,无需回调
  • 不要池化虚拟线程;使用其他机制进行限流
  • 检查固定并在必要时进行缓解
  • 在虚拟线程中最小化线程本地变量的使用
comments powered by Disqus