本文翻译自 Virtual Threads,版权归原作者所有。
为什么需要虚拟线程?
Java 1.0 于 1995 年发布时,其 API 中包含约一百个类,其中就有 java.lang.Thread。Java 是第一个直接支持并发编程的主流编程语言。
从 Java 1.2 开始,每个 Java 线程都运行在底层操作系统提供的平台线程上。(在 Java 1.1 及之前的版本中,在某些平台上,所有 Java 线程由单个平台线程执行。)
平台线程的成本不低。启动一个平台线程需要数千条 CPU 指令,并且会消耗几兆字节的内存。服务器应用程序可能需要处理大量并发请求,为每个请求分配一个独立的平台线程变得不可行。在典型的服务器应用中,这些请求大部分时间都处于阻塞状态,等待数据库或其他服务返回结果。
提高吞吐量的传统方法是使用非阻塞 API。程序员不是等待结果,而是指定当结果可用时应调用哪个方法,以及在失败时调用另一个方法。这种方式很快就会变得令人不快,因为回调会不断嵌套。
JEP 425 在 Java 19 中引入了虚拟线程。多个虚拟线程运行在一个平台线程上。每当虚拟线程阻塞时,它就会被卸载,平台线程随即运行另一个虚拟线程。(“虚拟线程"这个名称类似于虚拟内存映射到实际 RAM。)虚拟线程在 Java 20 中成为预览特性(JEP 436),并在 Java 21 中正式发布。
有了虚拟线程,阻塞的成本变得很低。当结果不能立即获得时,你只需在虚拟线程中阻塞即可。你可以使用熟悉的编程结构——分支、循环、try 块——而不是一系列回调。
虚拟线程在并发任务数量庞大且任务主要阻塞在网络 I/O 上时非常有用。对于 CPU 密集型任务,它们没有任何优势。对于此类任务,可以考虑使用并行流或递归 fork-join 任务。
创建虚拟线程
工厂方法 Executors.newVirtualThreadPerTaskExecutor() 返回一个 ExecutorService,它会在单独的虚拟线程中运行每个任务。例如:
import java.util.concurrent.*;
public class VirtualThreadDemo {
public static void main(String[] args) {
final int NTASKS = 100;
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < NTASKS; i++) {
service.submit(() -> {
long id = Thread.currentThread().threadId();
LockSupport.parkNanos(1_000_000_000);
IO.println(id);
});
}
service.close();
}
}
顺便说一下,这段代码使用 LockSupport.parkNanos 而不是 Thread.sleep,这样我们就不必捕获麻烦的 InterruptedException。
你可能在使用需要线程工厂的底层 API。要获取虚拟线程的工厂,请使用新的 Thread.Builder 类:
Thread.Builder builder = Thread.ofVirtual().name("request-", 1);
ThreadFactory factory = builder.factory();
现在,调用 factory.newThread(myRunnable) 会创建一个新的(未启动的)虚拟线程。name 方法配置构建器以设置线程名称 request-1、request-2 等。
你也可以使用构建器创建单个虚拟线程:
Thread t = builder.unstarted(myRunnable);
或者,如果你想立即启动线程:
Thread t = builder.started(myRunnable);
最后,对于快速演示,有一个便捷方法:
Thread t = Thread.startVirtualThread(myRunnable);
请注意,只有第一种方法(使用执行器服务)适用于返回结果的任务(callables)。
Thread API 的变化
在经过一系列不同 API 的实验后,Java 虚拟线程的设计者决定简单地重用熟悉的 Thread API。虚拟线程是 Thread 的实例。取消操作与平台线程相同,通过调用 interrupt。与以往一样,线程代码必须检查"中断"标志或调用会检查它的方法。(大多数阻塞方法都会检查。)
有一些区别。特别是,所有虚拟线程:
- 属于单个线程组
- 优先级为
NORM_PRIORITY - 都是守护线程
没有 API 可以构造具有其他线程组的虚拟线程。尝试在虚拟线程上调用 setPriority 或 setDaemon 不会产生任何效果。
静态方法 Thread::getAllStackTraces 返回所有平台线程的堆栈跟踪映射。虚拟线程不包括在内。
新的 Thread::isVirtual 实例方法可以判断线程是否为虚拟线程。
请注意,无法找到虚拟线程在哪个平台线程上执行。
Java 19 对 Thread API 进行了一些与虚拟线程无关的更改:
- 现在有实例方法
join(Duration)和sleep(Duration)。 - 非 final 的
getId方法已被弃用,因为有人可能重写它以返回线程 ID 以外的内容。请改用 final 的threadId方法。
从 Java 20 开始,stop、suspend 和 resume 方法会对平台线程和虚拟线程抛出 UnsupportedOperationException。这些方法自 Java 1.2 起就已弃用,并自 Java 18 起标记为待删除。
捕获任务结果
你经常需要组合多个并发任务的结果:
Future<T1> f1 = service.submit(callable1);
Future<T2> f2 = service.submit(callable2);
result = combine(f1.get(), f2.get());
在虚拟线程出现之前,你可能会对阻塞的 get 调用感到不安。但现在阻塞的成本很低。下面是一个更具体的示例程序:
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;
public class VirtualThreadDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
Future<String> f1 = service.submit(() -> get("https://horstmann.com/random/adjective"));
Future<String> f2 = service.submit(() -> get("https://horstmann.com/random/noun"));
String result = f1.get() + " " + f2.get();
IO.println(result);
service.close();
}
private static HttpClient client = HttpClient.newHttpClient();
public static String get(String url) {
try {
var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
} catch (Exception ex) {
var rex = new RuntimeException();
rex.initCause(ex);
throw rex;
}
}
}
如果你有一个具有相同结果类型的任务列表,可以使用 invokeAll 方法,然后对每个 Future 调用 get:
List<Callable<T>> callables = ...;
List<T> results = new ArrayList<>();
for (Future<T> f : service.invokeAll(callables))
results.add(f.get());
同样,这里有一个更具体的示例程序:
import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;
public class VirtualThreadDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
List<Callable<String>> callables = new ArrayList<>();
final int ADJECTIVES = 4;
for (int i = 1; i <= ADJECTIVES; i++)
callables.add(() -> get("https://horstmann.com/random/adjective"));
callables.add(() -> get("https://horstmann.com/random/noun"));
List<String> results = new ArrayList<>();
for (Future<String> f : service.invokeAll(callables))
results.add(f.get());
IO.println(String.join(" ", results));
service.close();
}
private static HttpClient client = HttpClient.newHttpClient();
public static String get(String url) {
try {
var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
} catch (Exception ex) {
var rex = new RuntimeException();
rex.initCause(ex);
throw rex;
}
}
}
限流
虚拟线程提高了应用程序的吞吐量,因为你可以拥有比平台线程更多的并发任务。这可能会对任务调用的服务造成压力。例如,Web 服务可能无法承受大量并发请求。
对于平台线程,一个简单(尽管粗糙)的调优因素是这些任务的线程池大小。但你不应该池化虚拟线程。在虚拟线程上调度任务,然后在平台线程上调度这些虚拟线程显然是低效的。那么好处是什么呢?将虚拟线程数量限制为服务可以容忍的少量并发请求?那你为什么还要使用虚拟线程呢?
对于虚拟线程,你应该使用替代机制来控制对有限资源的访问。不要对并发任务进行整体限制,而是以适当的方式保护每个资源。对于数据库连接,连接池可能已经做了正确的事情。访问 Web 服务时,你了解自己的服务,可以提供适当的限流。
举个例子,在我的个人网站上,我提供了生成随机项目的演示服务。如果短时间内来自同一 IP 地址的大量请求,托管公司会将该 IP 地址列入黑名单。
以下示例程序展示了使用简单信号量进行限流,允许少量并发请求。当超过最大值时,acquire 方法会阻塞,但这没关系。使用虚拟线程,阻塞的成本很低。
import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.net.http.*;
public class RateLimitDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
List<Future<String>> futures = new ArrayList<>();
final int TASKS = 250;
for (int i = 1; i <= TASKS; i++)
futures.add(service.submit(() -> get("https://horstmann.com/random/word")));
for (Future<String> f : futures)
IO.print(f.get() + " ");
IO.println();
service.close();
}
private static HttpClient client = HttpClient.newHttpClient();
private static final Semaphore SEMAPHORE = new Semaphore(20);
public static String get(String url) {
try {
var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
SEMAPHORE.acquire();
try {
Thread.sleep(100);
return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
} finally {
SEMAPHORE.release();
}
} catch (Exception ex) {
ex.printStackTrace();
var rex = new RuntimeException();
rex.initCause(ex);
throw rex;
}
}
}
固定
虚拟线程调度器将虚拟线程挂载到载体线程上。默认情况下,载体线程的数量与 CPU 核心数相同。你可以使用 jdk.virtualThreadScheduler.parallelism VM 选项调整该数量。
当虚拟线程执行阻塞操作时,它应该从其载体线程上卸载,然后载体线程可以执行另一个虚拟线程。然而,在某些情况下,这种卸载是不可能的。在某些情况下,虚拟线程调度器会通过启动另一个载体线程来补偿。例如,在 JDK 21 中,许多文件 I/O 操作和调用 Object.wait 时会发生这种情况。你可以使用 jdk.virtualThreadScheduler.maxPoolSize VM 选项控制载体线程的最大数量。
在以下两种情况下,线程被称为固定:
- 执行
synchronized方法或代码块时 - 调用本地方法或外部函数时
固定本身并不坏。但是当固定的线程阻塞时,它无法被卸载。载体线程被阻塞,并且在 Java 21 中,不会启动额外的载体线程。这会减少运行虚拟线程的载体线程。
如果 synchronized 用于避免内存操作中的竞态条件,固定是无害的。但是,如果有阻塞调用,最好将 synchronized 替换为 ReentrantLock。当然,这只有在你能控制源代码时才可行。
要查找固定的线程是否被阻塞,请使用以下选项之一启动 JVM:
-Djdk.tracePinnedThreads=short -Djdk.tracePinnedThreads=full
你会得到一个堆栈跟踪,显示固定的线程何时阻塞:
... org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) <== monitors:1
...
请注意,每个固定位置只会收到一次警告!
或者,使用 Java Flight Recorder 记录,使用你喜欢的任务控制查看器查看,并查找 VirtualThreadPinned 和 VirtualThreadSubmitFailed 事件。
JVM 最终将实现 synchronized 方法或代码块不再导致固定。那时你只需要担心本地代码的固定。
以下示例程序展示了固定的实际情况。我们启动多个在同步方法中休眠的虚拟线程,阻塞它们的载体线程。添加了许多不执行任何工作的虚拟线程。但它们无法被调度,因为载体线程池已经完全耗尽。请注意,当你执行以下操作时,问题会消失:
- 使用
ReentrantLock - 不使用虚拟线程
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class PinningDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
// Executors.newCachedThreadPool();
final int TASKS = 20;
long start = System.nanoTime();
for (int i = 1; i <= TASKS; i++) {
service.submit(() -> block());
// service.submit(() -> rblock());
}
for (int i = 1; i <= TASKS; i++) {
service.submit(() -> noblock());
}
service.close();
long end = System.nanoTime();
System.out.printf("%.2f%n", (end - start) * 1E-9);
}
public static synchronized void block() {
IO.println("Entering block " + Thread.currentThread());
LockSupport.parkNanos(1_000_000_000);
IO.println("Exiting block " + Thread.currentThread());
}
private static Lock lock = new ReentrantLock();
public static void rblock() {
lock.lock();
try {
IO.println("Entering rblock " + Thread.currentThread());
LockSupport.parkNanos(1_000_000_000);
IO.println("Exiting rblock " + Thread.currentThread());
} finally {
lock.unlock();
}
}
public static void noblock() {
IO.println("Entering noblock " + Thread.currentThread());
LockSupport.parkNanos(1_000_000_000);
IO.println("Exiting noblock " + Thread.currentThread());
}
}
线程本地变量
线程本地变量是一个对象,其 get 和 set 方法访问的值取决于当前线程。为什么你会想要这样的东西而不是使用全局或局部变量呢?经典的应用场景是非线程安全的服务,例如 SimpleDateFormat,或者会遭受竞争的服务,例如随机数生成器。每个线程的实例可以比受锁保护的全局实例性能更好。
线程本地变量的另一个常见用途是提供"隐式"上下文,例如为每个任务正确配置的数据库连接。任务代码不需要将上下文从一个方法传递到另一个方法,而是在需要访问数据库时简单地读取线程本地变量。
迁移到虚拟线程时,线程本地变量可能会成为问题。虚拟线程可能远多于线程池中的线程,现在你有更多的线程本地实例。在这种情况下,你应该重新考虑共享策略。
要在你的应用中定位线程本地变量的使用,请使用 VM 标志 jdk.traceVirtualThreadLocals 运行。当虚拟线程更改线程本地变量时,你会得到一个堆栈跟踪。
总结
- 当你有许多主要阻塞在网络 I/O 上的任务时,使用虚拟线程来提高吞吐量
- 主要好处是熟悉的"同步"编程风格,无需回调
- 不要池化虚拟线程;使用其他机制进行限流
- 检查固定并在必要时进行缓解
- 在虚拟线程中最小化线程本地变量的使用