看了下Java Tutorials中的fork/join章節,整理下。 什麼是fork/join框架 fork/join框架是ExecutorService介面的一個實現,可以幫助開發人員充分利用多核處理器的優勢,編寫出並行執行的程式,提高應用程式的性能;設計的目的是為了處理那些可以被遞歸拆分的任務 ...
看了下Java Tutorials中的fork/join章節,整理下。
什麼是fork/join框架
fork/join框架是ExecutorService介面的一個實現,可以幫助開發人員充分利用多核處理器的優勢,編寫出並行執行的程式,提高應用程式的性能;設計的目的是為了處理那些可以被遞歸拆分的任務。
fork/join框架與其它ExecutorService
的實現類相似,會給線程池中的線程分發任務,不同之處在於它使用了工作竊取演算法,所謂工作竊取,指的是對那些處理完自身任務的線程,會從其它線程竊取任務執行。
fork/join框架的核心是ForkJoinPool
類,該類繼承了AbstractExecutorService類。ForkJoinPool
實現了工作竊取演算法並且能夠執行 ForkJoinTask
任務。
基本使用方法
在使用fork/join框架之前,我們需要先對任務進行分割,任務分割代碼應該跟下麵的偽代碼類似:
if (任務足夠小){ 直接執行該任務;
}else{ 將任務一分為二; 執行這兩個任務並等待結果;
}
首先,我們會在ForkJoinTask的子類中封裝以上代碼,不過一般我們會使用更加具體的ForkJoinTask類型,如 RecursiveTask
(可以返回一個結果)或RecursiveAction
。
當寫好ForkJoinTask的子類後,創建該對象,該對象代表了所有需要完成的任務;然後將這個任務對象傳給ForkJoinPool實例的invoke()去執行即可。
例子-圖像模糊
為了更加直觀的理解fork/join框架是如何工作的,可以看一下下麵這個例子。假定我們有一個圖像模糊的任務需要完成,原始圖像數據可以用一個整型數組表示,每一個整型元素包含了一個像素點的顏色值(RBG,存放在整型元素的不同位中)。目標圖像同樣是由一個整型數組構成,每個整型元素包含RBG顏色信息;
執行模糊操作需要遍歷原始圖像整型數組的每個元素,並對其周圍的像素點做均值操作(RGB均值),然後將結果存放到目標數組中。由於圖像是一個大數組,這個處理操作會花費一定的時間。但是有了fork/join框架,我們可以充分利用多核處理器進行並行計算。如下是一個可能的代碼實現(圖像做水平方向的模糊操作):
Tips:該例子僅僅是闡述fork/join框架的使用,並不推薦使用該方法做圖像模糊,圖像邊緣處理也沒做判斷
public class ForkBlur extends RecursiveAction { private static final long serialVersionUID = -8032915917030559798L; private int[] mSource; private int mStart; private int mLength; private int[] mDestination; private int mBlurWidth = 15; // Processing window size, should be odd. public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } // Average pixels from source, write results into destination. protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Re-assemble destination pixel. int dpixel = (0xff000000) | (((int) rt) << 16) | (((int) gt) << 8) | (((int) bt) << 0); mDestination[index] = dpixel; } } ...
現在,我們開始編寫compute()的實現方法,該方法分成兩部分:直接執行模糊操作和任務的劃分;一個數組長度閾值sThreshold可以幫助我們決定任務是直接執行還是進行劃分;
@Override protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); }
接下來按如下步驟即可完成圖像模糊任務啦:
1、創建圖像模糊任務
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
2、創建ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
3、執行圖像模糊任務
pool.invoke(fb);
完整代碼如下:
/* * Copyright (c) 2010, 2013, Oracle and/or its affiliates. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * - Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * - Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * - Neither the name of Oracle or the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ import java.awt.image.BufferedImage; import java.io.File; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import javax.imageio.ImageIO; /** * ForkBlur implements a simple horizontal image blur. It averages pixels in the * source array and writes them to a destination array. The sThreshold value * determines whether the blurring will be performed directly or split into two * tasks. * * This is not the recommended way to blur images; it is only intended to * illustrate the use of the Fork/Join framework. */ public class ForkBlur extends RecursiveAction { private static final long serialVersionUID = -8032915917030559798L; private int[] mSource; private int mStart; private int mLength; private int[] mDestination; private int mBlurWidth = 15; // Processing window size, should be odd. public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } // Average pixels from source, write results into destination. protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Re-assemble destination pixel. int dpixel = (0xff000000) | (((int) rt) << 16) | (((int) gt) << 8) | (((int) bt) << 0); mDestination[index] = dpixel; } } protected static int sThreshold = 10000; @Override protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); } // Plumbing follows. public static void main(String[] args) throws Exception { String srcName = "C:\\test6.jpg"; File srcFile = new File(srcName); BufferedImage image = ImageIO.read(srcFile); System.out.println("Source image: " + srcName); BufferedImage blurredImage = blur(image); String dstName = "C:\\test6_out.jpg"; File dstFile = new File(dstName); ImageIO.write(blurredImage, "jpg", dstFile); System.out.println("Output image: " + dstName); } public static BufferedImage blur(BufferedImage srcImage) { int w = srcImage.getWidth(); int h = srcImage.getHeight(); int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w); int[] dst = new int[src.length]; System.out.println("Array size is " + src.length); System.out.println("Threshold is " + sThreshold); int processors = Runtime.getRuntime().availableProcessors(); System.out.println(Integer.toString(processors) + " processor" + (processors != 1 ? "s are " : " is ") + "available"); ForkBlur fb = new ForkBlur(src, 0, src.length, dst); ForkJoinPool pool = new ForkJoinPool(); long startTime = System.currentTimeMillis(); pool.invoke(fb); long endTime = System.currentTimeMillis(); System.out.println("Image blur took " + (endTime - startTime) + " milliseconds."); BufferedImage dstImage = new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB); dstImage.setRGB(0, 0, w, h, dst, 0, w); return dstImage; } }View Code
測試了一下,執行效果如下:
Source image: C:\test6.jpg
Array size is 120000
Threshold is 10000
4 processors are available
Image blur took 10 milliseconds.
Output image: C:\test6_out.jpg
JDK中使用fork/join的例子
除了我們上面提到的使用fork/join框架並行執行圖像模糊任務之外,在JAVA SE中,也已經利用fork/join框架實現了一些非常有用的特性。其中一個實現是在JAVA SE8 中java.util.Arrays
類的parallelSort()方法。這些方法和sort()方法類似,但是可以通過fork/join框架並行執行。對於大數組排序,在多核處理器系統中,使用並行排序方法比順序排序更加高效。當然,關於這些排序方法是如何利用fork/join框架不在本篇文章討論範圍,更多信息可以查看JAVA API文檔。
另一個fork/join框架的實現是在JAVA SE8中的java.util.streams包內,與Lambda表達式相關,更多信息,可以查看https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html鏈接。
參考鏈接:https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html