线程间的同步

jdk提供了比较多的线程间同步工具,比如Lock,CountDownLatch,Condition来完成线程间的同步,但是这些高级工具在某些场景可能显得太重量级了,本文简单探讨下不同的实现方式的。

我们假设如下场景,线程A创建任务id=1发起远程调用,响应时,io线程解码id,并通知线程A继续执行。定义如下接口:

public interface FutureResult {

    Object get(long timeout, @NotNull TimeUnit unit) throws TimeoutException;

    void set(Object Result);

    boolean isDone();

    Integer getId();

    Map<Integer, FutureResult> FUTURE_RESULTs = Maps.newConcurrentHashMap();

    static void set(Integer id, Object response) {
        final FutureResult future = FUTURE_RESULTs.remove(id);
        if (future != null) {
            future.set(response);
        }
    }

    AtomicInteger idx = new AtomicInteger();

    static Integer getNextId() {
        return idx.incrementAndGet();
    }
}

1. 使用LockSupport

@Slf4j
public class LockSupportFutureResult implements FutureResult {
    private Integer id;
    private volatile Object result;
    private Thread runner;

    public LockSupportFutureResult() {
        id = FutureResult.getNextId();
        FUTURE_RESULTs.put(id, this);
    }

    public Object get(long timeout, @NotNull TimeUnit unit) throws TimeoutException {
        if (result != null) {
            return result;
        }
        long nanos = unit.toNanos(timeout);
        final long deadline = System.nanoTime() + nanos;
        runner = Thread.currentThread();
        try {
            for (; ; ) {
                LockSupport.parkNanos(this, nanos);
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    throw new TimeoutException();
                } else {
                    if (result != null) {
                        return result;
                    }
                }
            }
        } finally {
            FUTURE_RESULTs.remove(this.id);
        }
    }

    public void set(Object response) {
        LockSupport.unpark(this.runner);
        this.result = response;
    }

    public boolean isDone() {
        return this.result != null;
    }

    public Integer getId() {
        return id;
    }
}

2. 使用FutureTask

@Slf4j
public class FutureTaskResult extends FutureTask implements FutureResult {
    private static Callable DO_NOTHING = () -> null;
    private Integer id;

    public FutureTaskResult() {
        super(DO_NOTHING);
        id = FutureResult.getNextId();
        FUTURE_RESULTs.put(id, this);
    }

    public Integer getId() {
        return id;
    }

    public void set(Object o) {
        super.set(o);
    }

    @Override
    public Object get(long timeout, @NotNull TimeUnit unit) throws TimeoutException {
        try {
            return super.get(timeout, unit);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            FUTURE_RESULTs.remove(this.id);
        }
    }
}

3. 简单总结

使用CountDownLatch,ReentrantLock,内部使用AbstractQueuedSynchronizer数据结构来处理多个等待者,当请求数量很大时,这种开销也不容小视。如果非常关注性能可以考虑直接用LockSupport,担心hold不住的话可以考虑用FutureTask.

给攻城狮一个小小的鼓励!