千家信息网

TransmittableThreadLocal怎么解决池化复用线程的传值问题

发表于:2024-11-12 作者:千家信息网编辑
千家信息网最后更新 2024年11月12日,这篇文章主要介绍"TransmittableThreadLocal怎么解决池化复用线程的传值问题",在日常操作中,相信很多人在TransmittableThreadLocal怎么解决池化复用线程的传值
千家信息网最后更新 2024年11月12日TransmittableThreadLocal怎么解决池化复用线程的传值问题

这篇文章主要介绍"TransmittableThreadLocal怎么解决池化复用线程的传值问题",在日常操作中,相信很多人在TransmittableThreadLocal怎么解决池化复用线程的传值问题问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"TransmittableThreadLocal怎么解决池化复用线程的传值问题"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

TransmittableThreadLocal

一般情况下,ThreadLocal都可以满足我们的需求,当我们出现需要 在使用线程池等会池化复用线程的执行组件情况下传递ThreadLocal ,

这个场景就是TransmittableThreadLocal解决的问题。

Github地址:https://github.com/alibaba/transmittable-thread-local

感兴趣的可以去下载的玩一玩,接下来我们来介绍一下这个组件的神奇之处。

首先看个demo, 通过demo,我们先了解了解怎么用

demo
/** * ttl测试 * * @author zhangyunhe * @date 2020-04-23 12:47 */public class Test {    // 1. 初始化一个TransmittableThreadLocal,这个是继承了InheritableThreadLocal的    static TransmittableThreadLocal local = new TransmittableThreadLocal<>();    // 初始化一个长度为1的线程池    static ExecutorService poolExecutor = Executors.newFixedThreadPool(1);    public static void main(String[] args) throws ExecutionException, InterruptedException {        Test test = new Test();        test.test();    }    private void test() throws ExecutionException, InterruptedException {        // 设置初始值        local.set("天王老子");        //!!!! 注意:这个地方的Task是使用了TtlRunnable包装的        Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));        future.get();        Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任务2")));        future2.get();              System.out.println("父线程的值:"+local.get());        poolExecutor.shutdown();    }    class Task implements Runnable{        String str;        Task(String str){            this.str = str;        }        @Override        public void run() {            // 获取值            System.out.println(Thread.currentThread().getName()+":"+local.get());            // 重新设置一波            local.set(str);        }    }}

输出结果:

pool-1-thread-1:天王老子pool-1-thread-1:天王老子父线程的值:天王老子

原理分析

我们首先看一下TransmittableThreadLocal的源码,

public class TransmittableThreadLocal extends InheritableThreadLocal implements TtlCopier {    // 1. 此处的holder是他的主要设计点,后续在构建TtlRunnable  private static InheritableThreadLocal, ?>> holder =            new InheritableThreadLocal, ?>>() {                @Override                protected WeakHashMap, ?> initialValue() {                    return new WeakHashMap, Object>();                }                @Override                protected WeakHashMap, ?> childValue(WeakHashMap, ?> parentValue) {                    return new WeakHashMap, Object>(parentValue);                }            };    @SuppressWarnings("unchecked")    private void addThisToHolder() {        if (!holder.get().containsKey(this)) {            holder.get().put((TransmittableThreadLocal) this, null); // WeakHashMap supports null value.        }    }    @Override    public final T get() {        T value = super.get();        if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();        return value;    }    /**     * see {@link InheritableThreadLocal#set}     */    @Override    public final void set(T value) {        if (!disableIgnoreNullValueSemantics && null == value) {            // may set null to remove value            remove();        } else {            super.set(value);            addThisToHolder();        }    }    /**     * see {@link InheritableThreadLocal#remove()}     */    @Override    public final void remove() {        removeThisFromHolder();        super.remove();    }    private void superRemove() {        super.remove();    }}

步骤说明:

  1. 在代码中,作者构建了一个holder对象,这个对象是一个InheritableThreadLocal , 里面的类型是一个弱引用的WeakHashMap , 这个map的va lu就是TransmittableThreadLocal , 至于value永远都是空的

holder里面存储的是这个应用里面,所有关于TransmittableThreadLocal的引用。

  1. 从上面可以看到,每次get, set ,remove都会操作holder对象,这样做的目的是为了保持TransmittableThreadLocal所有的这个引用都在holder里面存一份。

TtlRunnable

回到我们上面的代码

Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));

细心的朋友可能已经发现了,我们调用了TtlRunnable 对象的get方法,下面看一下这个方法有啥作用吧

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {        if (null == runnable) return null;        if (runnable instanceof TtlEnhanced) {            // avoid redundant decoration, and ensure idempotency            if (idempotent) return (TtlRunnable) runnable;            else throw new IllegalStateException("Already TtlRunnable!");        }                      // 重点在这里        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);    }

看上面的代码,细节上我们不看,我们看大致的思路, 这个地方主要就是根据传入的runnable构建了一个TtlRunnable对象。

private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {                      //重点在这里        this.capturedRef = new AtomicReference(capture());        this.runnable = runnable;        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;}

下面这行代码,运行到这里的时候,还是在主线程里面,调用了capture方法

this.capturedRef = new AtomicReference(capture());
capture
public static Object capture() {    // 构建一个临时对象,主要看captureTtlValues方法    return new Snapshot(captureTtlValues(), captureThreadLocalValues());}private static WeakHashMap, Object> captureTtlValues() {  // 构建一个WeakHashMap方法,  WeakHashMap, Object> ttl2Value = new WeakHashMap, Object>();  // 在主线程里面,调用holder变量,循环获取里面所有的key和value  for (TransmittableThreadLocal threadLocal : holder.get().keySet()) {    ttl2Value.put(threadLocal, threadLocal.copyValue());  }  // 返回出去  return ttl2Value;}

步骤说明:

1.调用静态变量holder, 循环获取里面所有的key和value, value的获取就比较巧妙一点。

private T copyValue() {      // 这里的get方法,调用的是父类的方法,可以在父类里面最终获取到当前TransmittableThreadLocal所对应的value      return copy(get());}

2.组装好一个WeakHashMap出去,最终就会到了我们上面的构造方法里面,针对capturedRef 的赋值操作。

run
@Overridepublic void run() {  //1. 获取到刚刚构造TtlRunnable对象的时候初始化的capturedRef对象。包含了从submit丢任务进来的时候父线程的数据  Object captured = capturedRef.get();  if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {    throw new IllegalStateException("TTL value reference is released after run!");  }        // 清除不在captured里面的key,同时在这个子线程中,对所有的ThreadLocal进行重新设置值  Object backup = replay(captured);  try {    // 执行实际的线程方法    runnable.run();  } finally {    // 做好还原工作,根据backup    restore(backup);  }}private static WeakHashMap, Object> replayTtlValues(@NonNull WeakHashMap, Object> captured) {            WeakHashMap, Object> backup = new WeakHashMap, Object>();    for (final Iterator> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {      TransmittableThreadLocal threadLocal = iterator.next();      // 做好当前线程的local备份      backup.put(threadLocal, threadLocal.get());      // 清除数据,不在captured里面的。      if (!captured.containsKey(threadLocal)) {        iterator.remove();        threadLocal.superRemove();      }    }    // 这里就是把值设置到当前线程的TransmittableThreadLocal里面。    setTtlValuesTo(captured);    // 一个钩子    doExecuteCallback(true);    return backup;}

到此,关于"TransmittableThreadLocal怎么解决池化复用线程的传值问题"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0