轉自:http://blog.csdn.net/aitangyong/article/details/46472643 JDK7對JDK5中的J.U.C併發工具進行了增強,其中之一就是新增了TransferQueue。Java併發相關的JSR規範,可以查看Doug Lea維護的blog。現在簡單介紹 ...
轉自:http://blog.csdn.net/aitangyong/article/details/46472643
JDK7對JDK5中的J.U.C併發工具進行了增強,其中之一就是新增了TransferQueue。Java併發相關的JSR規範,可以查看Doug Lea維護的blog。現在簡單介紹下這個類的使用方式。
[java] view plain copy
- public interface TransferQueue<E> extends BlockingQueue<E>
- {
- /**
- * Transfers the element to a waiting consumer immediately, if possible.
- *
- * <p>More precisely, transfers the specified element immediately
- * if there exists a consumer already waiting to receive it (in
- * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
- * otherwise returning {@code false} without enqueuing the element.
- *
- * @param e the element to transfer
- * @return {@code true} if the element was transferred, else
- * {@code false}
- * @throws ClassCastException if the class of the specified element
- * prevents it from being added to this queue
- * @throws NullPointerException if the specified element is null
- * @throws IllegalArgumentException if some property of the specified
- * element prevents it from being added to this queue
- */
- boolean tryTransfer(E e);
- /**
- * Transfers the element to a consumer, waiting if necessary to do so.
- *
- * <p>More precisely, transfers the specified element immediately
- * if there exists a consumer already waiting to receive it (in
- * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
- * else waits until the element is received by a consumer.
- *
- * @param e the element to transfer
- * @throws InterruptedException if interrupted while waiting,
- * in which case the element is not left enqueued
- * @throws ClassCastException if the class of the specified element
- * prevents it from being added to this queue
- * @throws NullPointerException if the specified element is null
- * @throws IllegalArgumentException if some property of the specified
- * element prevents it from being added to this queue
- */
- void transfer(E e) throws InterruptedException;
- /**
- * Transfers the element to a consumer if it is possible to do so
- * before the timeout elapses.
- *
- * <p>More precisely, transfers the specified element immediately
- * if there exists a consumer already waiting to receive it (in
- * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
- * else waits until the element is received by a consumer,
- * returning {@code false} if the specified wait time elapses
- * before the element can be transferred.
- *
- * @param e the element to transfer
- * @param timeout how long to wait before giving up, in units of
- * {@code unit}
- * @param unit a {@code TimeUnit} determining how to interpret the
- * {@code timeout} parameter
- * @return {@code true} if successful, or {@code false} if
- * the specified waiting time elapses before completion,
- * in which case the element is not left enqueued
- * @throws InterruptedException if interrupted while waiting,
- * in which case the element is not left enqueued
- * @throws ClassCastException if the class of the specified element
- * prevents it from being added to this queue
- * @throws NullPointerException if the specified element is null
- * @throws IllegalArgumentException if some property of the specified
- * element prevents it from being added to this queue
- */
- boolean tryTransfer(E e, long timeout, TimeUnit unit)
- throws InterruptedException;
- /**
- * Returns {@code true} if there is at least one consumer waiting
- * to receive an element via {@link #take} or
- * timed {@link #poll(long,TimeUnit) poll}.
- * The return value represents a momentary state of affairs.
- *
- * @return {@code true} if there is at least one waiting consumer
- */
- boolean hasWaitingConsumer();
- /**
- * Returns an estimate of the number of consumers waiting to
- * receive elements via {@link #take} or timed
- * {@link #poll(long,TimeUnit) poll}. The return value is an
- * approximation of a momentary state of affairs, that may be
- * inaccurate if consumers have completed or given up waiting.
- * The value may be useful for monitoring and heuristics, but
- * not for synchronization control. Implementations of this
- * method are likely to be noticeably slower than those for
- * {@link #hasWaitingConsumer}.
- *
- * @return the number of consumers waiting to receive elements
- */
- int getWaitingConsumerCount();
- }
可以看到TransferQueue同時也是一個阻塞隊列,它具備阻塞隊列的所有特性,主要介紹下上面5個新增API的作用。
1.transfer(E e)若當前存在一個正在等待獲取的消費者線程,即立刻將e移交之;否則將元素e插入到隊列尾部,並且當前線程進入阻塞狀態,直到有消費者線程取走該元素。
[java] view plain copy
- public class TransferQueueDemo {
- private static TransferQueue<String> queue = new LinkedTransferQueue<String>();
- public static void main(String[] args) throws Exception {
- new Productor(1).start();
- Thread.sleep(100);
- System.out.println("over.size=" + queue.size());
- }
- static class Productor extends Thread {
- private int id;
- public Productor(int id) {
- this.id = id;
- }
- @Override
- public void run() {
- try {
- String result = "id=" + this.id;
- System.out.println("begin to produce." + result);
- queue.transfer(result);
- System.out.println("success to produce." + result);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
可以看到生產者線程會阻塞,因為調用transfer()的時候並沒有消費者在等待獲取數據。隊列長度變成了1,說明元素e沒有移交成功的時候,會被插入到阻塞隊列的尾部。
2.tryTransfer(E e)若當前存在一個正在等待獲取的消費者線程,則該方法會即刻轉移e,並返回true;若不存在則返回false,但是並不會將e插入到隊列中。這個方法不會阻塞當前線程,要麼快速返回true,要麼快速返回false。
3.hasWaitingConsumer()和getWaitingConsumerCount()用來判斷當前正在等待消費的消費者線程個數。
4.tryTransfer(E e, long timeout, TimeUnit unit) 若當前存在一個正在等待獲取的消費者線程,會立即傳輸給它; 否則將元素e插入到隊列尾部,並且等待被消費者線程獲取消費掉。若在指定的時間內元素e無法被消費者線程獲取,則返回false,同時該元素從隊列中移除。
[java] view plain copy
- public class TransferQueueDemo {
- private static TransferQueue<String> queue = new LinkedTransferQueue<String>();
- public static void main(String[] args) throws Exception {
- new Productor(1).start();
- Thread.sleep(100);
- System.out.println("over.size=" + queue.size());//1
- Thread.sleep(1500);
- System.out.println("over.size=" + queue.size());//0
- }
- static class Productor extends Thread {
- private int id;
- public Productor(int id) {
- this.id = id;
- }
- @Override
- public void run() {
- try {
- String result = "id=" + this.id;
- System.out.println("begin to produce." + result);
- queue.tryTransfer(result, 1, TimeUnit.SECONDS);
- System.out.println("success to produce." + result);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
第一次還沒到指定的時間,元素被插入到隊列中了,所有隊列長度是1;第二次指定的時間片耗盡,元素從隊列中移除了,所以隊列長度是0。
這篇文章中講了SynchronousQueue的使用方式,可以看到TransferQueue也具有SynchronousQueue的所有功能,但是TransferQueue的功能更強大。這篇文章中提到了這2個API的區別:
[plain] view plain copy
- TransferQueue is more generic and useful than SynchronousQueue however as it allows you to flexibly decide whether to use normal BlockingQueue
- semantics or a guaranteed hand-off. In the case where items are already in the queue, calling transfer will guarantee that all existing queue
- items will be processed before the transferred item.
- SynchronousQueue implementation uses dual queues (for waiting producers and waiting consumers) and protects both queues with a single lock. The
- LinkedTransferQueue implementation uses CAS operations to form a nonblocking implementation and that is at the heart of avoiding serialization
- bottlenecks.