ThreadLocal原理分析和拓展

ThreadLocal原理分析和拓展

多线程访问同一个共享变量由于线程的执行顺序和变量的可见性原因会导致并发问题,我们一般会有两种解决思路:

  • 一是对访问的变量进行加锁处理
  • 二是每个线程都访问本线程的变量

本次我们重点分析Java中通过ThreadLocal实现的本地变量

ThreadLocal实现原理

当使用ThreadLocal维护变量的时候,该变量存储在线程的本地,其他线程无法访问,做到了线程间的隔离,也就没有线程安全的问题了。

  • 每一个Thread中都会有一个ThreadLocalMap对象,
  • ThreadLocalMap 中有一个 Entry 数组
  • Entry中key是ThreadLocal对象实例 ,继承自WeakReference(弱引用),value就是我们要设置的值

我们首先来看一下 ThreadLocalMap 是一个静态内部类


static class ThreadLocalMap {

// key是弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// 初始容量,必须为二的整数幂
private static final int INITIAL_CAPACITY = 16;
// map中储存Entry的量
private Entry[] table;
// 总共储存了多少对象
private int size = 0;
// 下次扩容的数量
private int threshold; // Default to 0

}

接下看看如何设置值

public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// 如果不为空就设置值,为空就创建
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}

// 获取 ThreadLocalMap
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

// 创建 ThreadLocalMap
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

内存泄漏原因

  • TheadLocal 本身不存储值,它只是做为一个key,来让线程从ThreadLocal中获取value

  • ThreadLocalMap 是使用ThreadLocal的弱引用做为key的,一个对象如果只剩下弱引用(没有强引用),该对象在GC就会被回收

如果我们手动将 ThreadLocal A 的对象赋值为 null,这个 ThreadLocal A 就会被回收,ThreadLocalMap 中就会出现 key 为 null 的 Entry。

Java 程序没有办法访问这些 key 为 null 的Entry的value,如果当前线程迟迟不结束,使用的线程池,或者该线程需要执行一些耗时任务,在系统值就会出现一条强引用链,从 ThreadRef -> Thread B -> ThreadLocalMap -> value -> Obj C 这个value就无法回收,导致内存泄漏。

只有当前线程结束之后,ThreadRef 不存在栈中,强引用断开才能被回收。

实际上 ThreadLocal 也考虑了防护措施,在调用 ThreadLocal 的 get()、set() 方法的时候,会清除ThreadLocalMap 中 key 为 null 的值。主要调用的是 expungeStaleEntry方法。

避免内存泄漏的最好做法:主动调用ThreadLocal对象的 remove 方法,将ThreadLocal 对象的值删除。

/**
* @author cuzz
* @date 2021/8/14 17:29
*/
public class ThreadLocalMemoryLeak {
private static final int SIZE = 1000;

static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,5,1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000)
);

@Data
static class Memory5M {
public Memory5M(int i) {
this.i = i;
}
private int i;
private byte[] m = new byte[1024 * 1024 * 5];
}

static ThreadLocal<Memory5M> threadLocal = new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < SIZE; i++) {
int finalI = i;
executor.execute(()-> {
threadLocal.set(new Memory5M(finalI));
// 执行
task();
// 删除,不删除会产生内存泄漏
threadLocal.remove();
});
Thread.sleep(100);
}
System.out.println("done");

}
private static void task() {
Memory5M memory5M = threadLocal.get();
int i = memory5M.getI();
System.out.printf("task %s process...\n", i);
}
}

InheritableThreadLoca原理分析

在子线程中如何获取父线程的本地变量

先看一个例子

/**
* @author cuzz
* @date 2021/08/14 20:19
*/
public class ThreadLocalDemo {

private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

public static void main(String[] args) throws InterruptedException {
threadLocal.set("aaa");
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
new Thread(()->{
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
}).start();

TimeUnit.SECONDS.sleep(1);
}
}

输出结果,可以看到父线程输出的是 aaa,子线程输出的为 null

main>>>aaa
Thread-0>>>null

这个时候,改用为 InheritableThreadLocal

/**
* @author cuzz
* @date 2021/08/14 20:19
*/
public class ThreadLocalDemo {

// private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
private static ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();

public static void main(String[] args) throws InterruptedException {
threadLocal.set("aaa");
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
new Thread(()->{
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
}).start();

TimeUnit.SECONDS.sleep(1);
}
}

这个时候输出结果

main>>>aaa
Thread-0>>>aaa

源码分析,InheritableThreadLocal 继承 ThreadLocal 并实现了以下3个方法。

public class InheritableThreadLocal<T> extends ThreadLocal<T> {

protected T childValue(T parentValue) {
return parentValue;
}


ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}

void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}

Thread维护了两个ThreadLocalMap

/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class.
*/
ThreadLocal.ThreadLocalMap threadLocals = null;

/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

现在看看Thread中

public Thread(Runnable target) {
this(null, target, "Thread-" + nextThreadNum(), 0);
}

public Thread(ThreadGroup group, Runnable target, String name,
long stackSize) {
this(group, target, name, stackSize, null, true);
}

private Thread(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}

this.name = name;

// ...

// 这里会把父线程的ThreadLocal传递到子线程
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;

/* Set thread ID */
this.tid = nextThreadID();
}

最后 ThreadLocal.createInheritedMap这个方法,遍历父线的ThreadLocalMap中的值保存到自己的ThreadLocalMap中

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}

private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];

for (Entry e : parentTable) {
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}

注意的问题:

如果存储的是基本类型,那没有什么问题,如果存储的是自定义对象,那么父类和子类都是引用同一个对象,会相互影响。

package com.cuzz.threadlocal;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.TimeUnit;

/**
* @author cuzz
* @date 2021/08/14 20:19
*/
public class ThreadLocalDemo {

// private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
private static ThreadLocal<Person> threadLocal = new InheritableThreadLocal<>();

@Data
@AllArgsConstructor
static class Person{
private String name;
private Integer age;
}

public static void main(String[] args) throws InterruptedException {
threadLocal.set(new Person("cuzz", 18));
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
new Thread(()->{
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
Person person = threadLocal.get();
person.setName("faker");
person.setAge(20);
}).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
}
}

我们发现主线程ThreadLocal的值已经被子线程锁改变了。

main>>>ThreadLocalDemo.Person(name=cuzz, age=18)
Thread-0>>>ThreadLocalDemo.Person(name=cuzz, age=18)
main>>>ThreadLocalDemo.Person(name=faker, age=20)

这个时候我们可以自定义MyInheritableThreadLocal 重写childValue方法,做一个深拷贝就行。

/**
* @author cuzz
* @date 2021/8/14 21:24
*/
public class MyInheritableThreadLocal<T> extends InheritableThreadLocal<T>{
@Override
protected T childValue(T parentValue) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String s = objectMapper.writeValueAsString(parentValue);
return (T) objectMapper.readValue(s, parentValue.getClass());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

测试

package com.cuzz.threadlocal;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.TimeUnit;

/**
* @author cuzz
* @date 2021/08/14 20:19
*/
public class ThreadLocalDemo {

// private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
// private static ThreadLocal<Person> threadLocal = new InheritableThreadLocal<>();
private static ThreadLocal<Person> threadLocal = new MyInheritableThreadLocal<>();

@Data
@AllArgsConstructor
@NoArgsConstructor
static class Person{
private String name;
private Integer age;
}

public static void main(String[] args) throws InterruptedException {
threadLocal.set(new Person("cuzz", 18));
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
new Thread(()->{
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
Person person = threadLocal.get();
person.setName("faker");
person.setAge(20);
}).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
}
}

发现已经不影响了

main>>>ThreadLocalDemo.Person(name=cuzz, age=18)
Thread-0>>>ThreadLocalDemo.Person(name=cuzz, age=18)
main>>>ThreadLocalDemo.Person(name=cuzz, age=18)

TransmittableThreadLocal 原理分析

在线程池中如何获取父线程本地变量

我们知道使用InheritableThreadLocal只有在子线程创建的时候回继承父线程的本地变量,之后就不会再次拷贝了。

使用线程池提交任务,线程都是提前创建好的。

如果想每次提交任务都拷贝父线程的本地变量就要用到阿里开源的 TransmittableThreadLocal 是为了解决线程池或其他缓存线程组件下传递ThreadLocal的拓展。

使用也特别简单

package com.cuzz.threadlocal;

import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.threadpool.TtlExecutors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* @author cuzz
* @date 2021/8/14 22:09
*/
public class TransmittableThreadLocalDemo {
private static ThreadLocal<Person> threadLocal = new TransmittableThreadLocal<>();

static ExecutorService executorService = new ThreadPoolExecutor(
5,5,1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000)
);

@Data
@AllArgsConstructor
@NoArgsConstructor
static class Person{
private String name;
private Integer age;
}

public static void main(String[] args) {
// 包装一下
executorService = TtlExecutors.getTtlExecutorService(executorService);

threadLocal.set(new Person("cuzz", 18));
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
});

threadLocal.set(new Person("faker", 20));
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + ">>>" + threadLocal.get());
});

}
}

发送每次都是再提交任务的时候传递父线程的本地变量

pool-1-thread-1>>>TransmittableThreadLocalDemo.Person(name=cuzz, age=18)
pool-1-thread-2>>>TransmittableThreadLocalDemo.Person(name=faker, age=20)

具体

参考

Comments

Gitalk 加载中 ...