Java里面ThreadLocal源码分析以及如何产生的OOM

Java artisan 298℃ 0评论
要解决线程安全问题首先要清楚线程安全问题是怎么产生的?

线程安全问题通常是多线程环境下,多个线程同时操作某一个变量未进行同步操作而导致的数据错误(只读除外)。

如何解决多线程安全问题呢?

1:进行变量的加锁同步,进行串行访问,每次只有一个线程可以操作变量。

2:控制变量只对一个线程可见,其他线程没有权限操作。

ThreadLocal是怎么解决线程安全的?

ThreadLocal使用的就是【控制变量只对一个线程可见,其他线程没有权限操作】来控制线程安全的,ThreadLocal看名字就可以理解是线程局部变量,既然是局部变量,就会有一个作用域范围,这个范围就是线程Thread实例。如果将需要控制的变量控制在Thread实例作用域内,只有当前线程可以访问其变量,其他线程无法访问也就控制了变量对其他线程不可见,这样也就保证了变量的线程安全。

方案1:自行实现一个ThreadLocal
/**
 * 如何设计一个ThreadLocal ? => 基础版本1
 */
class ThreadLocalAccessorV1 {

    private static final Map<Long, Object> data = new ConcurrentHashMap<>();

    public static void set(Thread t, Object v) {
        isSelf(t);
        data.put(t.getId(), v);
    }

    // 不足:API不需要,还需要传递Thread参数;线程局部变量应该只有当前线程可以访问,因此API不是很友好。【有点吹毛求疵】
    public static Object get(Thread t) {
        isSelf(t);
        return data.get(t.getId());
    }

    // 存在问题1: 线程局部变量应该做到:线程局部变量的声明周期与线程的声明周期相同,共存亡!!!
    // 这种实现当线程结束之前,如果调用remove方法,当线程被GC回收之后,长久运行可能OOM
    public static Object remove(Thread t) {
        isSelf(t);
        return data.remove(t.getId());
    }

    private static void isSelf(Thread t) {
        // 避免 其他线程非法访问
        if (t != Thread.currentThread()) {
            throw new RuntimeException("跨线程非法访问");
        }
    }
}

测试代码

class CountTask implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            Integer cnt = (Integer) ThreadLocalAccessorV1.get(Thread.currentThread());
            if (cnt == null) {
                cnt = 0;
            }
            ThreadLocalAccessorV1.set(Thread.currentThread(), cnt + 1);
        }

        System.out.println(ThreadLocalAccessorV1.get(Thread.currentThread()));

    }
}

public class ThreadSafeMapV1 {

    public static void main(String[] args) {

        Thread t1 = new Thread(new CountTask(), "CountTask1");
        Thread t2 = new Thread(new CountTask(), "CountTask2");
        t1.start();
        t2.start();

        System.out.println(ThreadLocalAccessorV1.get(Thread.currentThread()));
        // 非法访问
        System.out.println(ThreadLocalAccessorV1.get(t1));
    }
}
方案2:优化之后的ThreadLocal实现
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

class ThreadLocalAccessorV2 {
    // 一个ThreadLocalAccessorV2对应一个KEY,该实例的所有线程局部变量对应的KEY都是此KEY
    private final String THREAD_LOCAL_KEY = UUID.randomUUID().toString();

    public void set(Object v) {
        // 获取当前线程
        ThreadLocal_Thread tlt = (ThreadLocal_Thread) Thread.currentThread();
        //操作当前线程的线程局部变量
        tlt.data.put(THREAD_LOCAL_KEY, v);
    }

    public Object get() {
        ThreadLocal_Thread tlt = (ThreadLocal_Thread) Thread.currentThread();
        return tlt.data.get(THREAD_LOCAL_KEY);
    }


    public Object remove() {
        ThreadLocal_Thread tlt = (ThreadLocal_Thread) Thread.currentThread();
        return tlt.data.remove(THREAD_LOCAL_KEY);
    }
}

class ThreadLocal_Thread extends Thread {

    // 解决了 线程局部变量与线程相同的声明周期问题
    // API问题也解决了
    final Map<String, Object> data = new ConcurrentHashMap<>();//线程局部变量保存的容器

    public ThreadLocal_Thread() {
    }

    public ThreadLocal_Thread(Runnable target) {
        super(target);
    }

    public ThreadLocal_Thread(Runnable target, String name) {
        super(target, name);
    }
}

测试代码:

class ThreadLocal_Thread extends Thread {

    // 解决了 线程局部变量与线程相同的声明周期问题
    // API问题也解决了
    final Map<String, Object> data = new ConcurrentHashMap<>();//线程局部变量保存的容器

    public ThreadLocal_Thread() {
    }

    public ThreadLocal_Thread(Runnable target) {
        super(target);
    }

    public ThreadLocal_Thread(Runnable target, String name) {
        super(target, name);
    }
}

class CountTaskV2 implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            Integer cnt = (Integer) ThreadSafeMapV2.accessorV2.get();
            if (cnt == null) {
                cnt = 0;
            }
            ThreadSafeMapV2.accessorV2.set(cnt + 1);
        }
        System.out.println(ThreadSafeMapV2.accessorV2.get());
    }
}

public class ThreadSafeMapV2 {

    public static final ThreadLocalAccessorV2 accessorV2 = new ThreadLocalAccessorV2();

    public static void main(String[] args) {
        ThreadLocal_Thread t1 = new ThreadLocal_Thread(new CountTaskV2(), "CountTaskV2_1");
        ThreadLocal_Thread t2 = new ThreadLocal_Thread(new CountTaskV2(), "CountTaskV2_2");
        t1.start();
        t2.start();
    }
}

方案2实现跟JDK的ThreadLocal实现一致了,看懂这个方案大概也就理解JDK的ThreadLocal是如何实现的;但是JDK实现的比较完善,考虑了GC回收等问题。

接下来看下JDK是如何实现的ThreadLocal?
  • 创建ThreadLocal
//  构造器DO NOTHING
    /**
     * Creates a thread local variable.
     * @see #withInitial(java.util.function.Supplier)
     */
    public ThreadLocal() {
    }
  • 设置值
     public void set(T value) {
       // 获取当前线程实例
        Thread t = Thread.currentThread();
       // 获取当前线程的局部变量存储容器,对应方案2中的ConcurrentHashMap
        ThreadLocalMap map = getMap(t);
        if (map != null)
            // map不等于空则设置值
            map.set(this, value);
        else
            // 初始化Map并设置值,属于懒加载方式
            createMap(t, value);
    }
  • 如何初始化Thread中的Map的?createMap方法源码:
    /**
     * Create the map associated with a ThreadLocal. Overridden in
     * InheritableThreadLocal.
     *
     * @param t the current thread
     * @param firstValue value for the initial entry of the map
     */
    void createMap(Thread t, T firstValue) {
        // 创建并将当前的值设置到ThreadLocalMap中
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

  //其中ThreadLocalMap是ThreadLocal中的一个内部静态类:java.lang.ThreadLocal.ThreadLocalMap,其中Key是ThreadLocal
  //每一个ThreadLocal在创建之后会生成一个固定的Key,参见实例 :   
  // private static AtomicInteger nextHashCode = new AtomicInteger();


  //同时ThreadLocalMap中自定义实现了Entry,源码如下:
        /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;
                        // 比较重要:Key实现了若引用,此处涉及到GC回收问题以及用错了会OOM
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

  • 如何获取值得呢
    public T get() {
        // 获取到当前线程实例
        Thread t = Thread.currentThread();
        //拿到当前线程的局部变量容器ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // Key为ThreadLocal
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                // 存在返回值
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        //不存在返回初始默认值
        return setInitialValue();
    }
  • 小结

ThreadLocal可以理解为是一个线程变量的访问器,是一个外观模式,对外屏蔽内部细节,暴露统一访问接口,借助ThreadLocal访问线程局部变量。

其中ThreadLocal是线程局部变量的Key,同一个ThreadLocal局部变量,在多个线程的ThreadLocalMap里面Key都是相同的,就是threadLocalHashCode的值。内部存储结构简单如下:

使用相同的变量threadLocal在线程A与线程B中设置了变量值,此时ThreadLocalMap中的Key是相同的,为threadLocal的hashCode值;其中threadLocal是弱引用其Key,线程强引用Value,参见图。弱引用的设计的目是考虑了变量的生命周期。

只有当线程存活时局部变量才有意义,当线程执行完毕被销毁时线程局部变量也就没意义了,如果Key是强引用,当线程执行完毕之后的引用关系如下:

此时线程执行结束理论上是应该被GC回收掉的,但是由于跟ThreadLocal存在强引用Thread,所以无法回收Thread,存在内存泄露问题;因此Key被设计为弱引用。

网上经常讨论ThreadLocal的OOM问题,是怎么产生的OOM问题呢?

当线程的生命周期远大于ThreadLocal,ThreadLocal在被GC回收之前没有调用remove方法,除此之外频繁创建新的ThreadLocal实例向该线程设置线程局部变量时就会内存泄露,错误示例代码:

public class JdkThreadLocal {

    static int GB = 1024 * 1024 * 1024;
    // -Xms2048M -Xmx2048M
    public static void main(String[] args) throws Exception {
        Thread MAIN_THREAD = Thread.currentThread();
        for (int i = 0; i < 2; i++) {
            System.gc();
            ThreadLocal<byte[]> tl = new ThreadLocal<>();
            Field threadLocalHashCode = ThreadLocal.class.getDeclaredField("threadLocalHashCode");
            threadLocalHashCode.setAccessible(true);
            System.out.println("threadLocalHashCode = " + threadLocalHashCode.get(tl));
            tl.set(new byte[GB]);
        }
    }
}
// OOM:
//Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
//  at JdkThreadLocal.main(JdkThreadLocal.java:8)
  • 分析: i = 0时:

    i =2 时,进行一个GC回收,此时会将i=1的threadLocal gc进行回收,此时内存分布:

    debug:

    此时key已经被回收,但是由于Thread还是存活状态,因此存在强引用执行value,value无法被回收释放。

  • 分析 i = 1时,再次设置1GB内存直接导致内存OOM

总结:如何避免ThreadLocal OOM
  • 同一业务使用静态变量,全局唯一使用。
  • 如果ThreadLocal的生命周期与Thread生命周期共存亡则可以不需要格外处理,完全有JVM负责回收。
  • 如果Thread的生命周期比Thread生命周期长,而且必须使用局部的ThreadLocal,则在ThreadLocal解引用(不存在强引用)之前必须调用remove方法显式释放value的内存空间。

转载请注明:Java工匠师 » Java里面ThreadLocal源码分析以及如何产生的OOM

喜欢 (7)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(1)个小伙伴参与了讨论
  1. 弱引用不是需引用;同时应该是线程Thread弱引用ThreadLocal,使用弱引用目的是当ThreadLocal被释放时,不会因为Thread存在引用无法释放
    javartisan2021-09-01 10:57 回复