Springboot項目redisTemplate實現輕量級消息隊列

来源:https://www.cnblogs.com/wangzaiplus/archive/2019/04/05/10660520.html
-Advertisement-
Play Games

背景 公司項目有個需求, 前端上傳excel文件, 後端讀取數據、處理數據、返回錯誤數據, 最簡單的方式同步處理, 客戶端上傳文件後一直阻塞等待響應, 但用戶體驗無疑很差, 處理數據可能十分耗時, 沒人願意傻等, 由於項目暫未使用ActiveMQ等消息隊列中間件, 而redis的lpush和rpop ...


背景
公司項目有個需求, 前端上傳excel文件, 後端讀取數據、處理數據、返回錯誤數據, 最簡單的方式同步處理, 客戶端上傳文件後一直阻塞等待響應, 但用戶體驗無疑很差, 處理數據可能十分耗時, 沒人願意傻等, 由於項目暫未使用ActiveMQ等消息隊列中間件, 而redis的lpush和rpop很適合作為一種輕量級的消息隊列實現, 所以用它完成此次功能開發

一、本文涉及知識點

  1. excel文件讀寫--阿裡easyexcel sdk
  2. 文件上傳、下載--騰訊雲對象存儲
  3. 遠程服務調用--restTemplate
  4. 生產者、消費者--redisTemplate leftPush和rightPop操作
  5. 非同步處理數據--Executors線程池
  6. 讀取網路文件流--HttpClient
  7. 自定義註解實現用戶身份認證--JWT token認證, 攔截器攔截標註有@LoginRequired註解的請求入口

當然, Java實現咯
涉及的知識點比較多, 每一個知識點都可以作為專題進行學習分析, 本文將完整實現呈現出來, 後期拆分與小伙伴分享學習

二、項目目錄結構

項目結構

說明: 資料庫DAO層放到另一個模塊了, 不是本文重點

三、主要maven依賴

  1. easyexcel
<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>easyexcel</artifactId>
            <version>${easyexcel-latestVersion}</version>
        </dependency>
  1. JWT
        <dependency>
            <groupId>io.jsonwebtoken</groupId>
            <artifactId>jjwt</artifactId>
            <version>0.7.0</version>
        </dependency>
  1. redis
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>
  1. 騰訊cos
        <dependency>
            <groupId>com.qcloud</groupId>
            <artifactId>cos_api</artifactId>
            <version>5.4.5</version>
        </dependency>

四、流程

  1. 用戶上傳文件
  2. 將文件存儲到騰訊cos
  3. 將上傳後的文件id及上傳記錄保存到資料庫
  4. redis生產一條導入消息, 即保存文件id到redis
  5. 請求結束, 返回"處理中"狀態
  6. redis消費消息
  7. 讀取cos文件, 非同步處理數據
  8. 將錯誤數據以excel形式上傳至cos, 以供用戶下載, 並更新處理狀態為"處理完成"
  9. 客戶端輪詢查詢處理狀態, 並可以下載錯誤文件
  10. 結束

五、實現效果

  1. 上傳文件
    上傳文件

  2. 資料庫導入記錄
    資料庫導入記錄

  3. 導入的數據
    導入的數據

  4. 下載錯誤文件
    下載錯誤文件

  5. 錯誤數據提示
    錯誤數據提示

  6. 查詢導入記錄
    查詢導入記錄

六、代碼實現

1、導入excel控制層

    @LoginRequired
    @RequestMapping(value = "doImport", method = RequestMethod.POST)
    public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
        PLUser user = getUser(request);
        return orderImportService.doImport(file, user.getId());
    }

2、service層

    @Override
    public JsonResponse doImport(MultipartFile file, Integer userId) {
        if (null == file || file.isEmpty()) {
            throw new ServiceException("文件不能為空");
        }

        String filename = file.getOriginalFilename();
        if (!checkFileSuffix(filename)) {
            throw new ServiceException("當前僅支持xlsx格式的excel");
        }

        // 存儲文件
        String fileId = saveToOss(file);
        if (StringUtils.isBlank(fileId)) {
            throw new ServiceException("文件上傳失敗, 請稍後重試");
        }

        // 保存記錄到資料庫
        saveRecordToDB(userId, fileId, filename);

        // 生產一條訂單導入消息
        redisProducer.produce(RedisKey.orderImportKey, fileId);

        return JsonResponse.ok("導入成功, 處理中...");
    }

    /**
     * 校驗文件格式
     * @param fileName
     * @return
     */
    private static boolean checkFileSuffix(String fileName) {
        if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
            return false;
        }

        int pointIndex = fileName.lastIndexOf(".");
        String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
        if (".xlsx".equals(suffix)) {
            return true;
        }

        return false;
    }

   /**
     * 將文件存儲到騰訊OSS
     * @param file
     * @return
     */
    private String saveToOss(MultipartFile file) {
        InputStream ins = null;
        try {
            ins = file.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }

        String fileId;
        try {
            String originalFilename = file.getOriginalFilename();
            File f = new File(originalFilename);
            inputStreamToFile(ins, f);
            FileSystemResource resource = new FileSystemResource(f);

            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }

        return fileId;
    }

3、redis生產者

@Service
public class RedisProducerImpl implements RedisProducer {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public JsonResponse produce(String key, String msg) {
        Map<String, String> map = Maps.newHashMap();
        map.put("fileId", msg);
        redisTemplate.opsForList().leftPush(key, map);
        return JsonResponse.ok();
    }

}

4、redis消費者

@Service
public class RedisConsumer {

    @Autowired
    public RedisTemplate redisTemplate;

    @Value("${txOssFileUrl}")
    private String txOssFileUrl;

    @Value("${txOssUploadUrl}")
    private String txOssUploadUrl;

    @PostConstruct
    public void init() {
        processOrderImport();
    }

    /**
     * 處理訂單導入
     */
    private void processOrderImport() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            while (true) {
                Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
                if (null == object) {
                    continue;
                }
                String msg = JSON.toJSONString(object);
                executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
            }
        });
    }

}

5、處理任務線程類

public class OrderImportTask implements Runnable {
    public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
        this.msg = msg;
        this.txOssFileUrl = txOssFileUrl;
        this.txOssUploadUrl = txOssUploadUrl;
    }
}

    /**
     * 註入bean
     */
    private void autowireBean() {
        this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
        this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
        this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
    }

    @Override
    public void run() {
        // 註入bean
        autowireBean();

        JSONObject jsonObject = JSON.parseObject(msg);
        String fileId = jsonObject.getString("fileId");

        MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
        param.add("id", fileId);

        ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
        String fileUrl = (String) responseResult.getData();
        if (StringUtils.isBlank(fileUrl)) {
            return;
        }

        InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
        List<Object> list = ExcelUtil.read(inputStream);
        process(list, fileId);
    }

    /**
     * 將文件上傳至oss
     * @param file
     * @return
     */
    private String saveToOss(File file) {
        String fileId;
        try {
            FileSystemResource resource = new FileSystemResource(file);
            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }
        return fileId;
    }

說明: 處理數據的業務邏輯代碼就不用貼了

6、上傳文件到cos

    @RequestMapping("/txOssUpload")
    @ResponseBody
    public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
        if (null == file || file.isEmpty()) {
            return ResponseResult.fail("文件不能為空");
        }

        String originalFilename = file.getOriginalFilename();
        originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問題
        String contentType = getContentType(originalFilename);
        String key;

        InputStream ins = null;
        File f = null;

        try {
            ins = file.getInputStream();
            f = new File(originalFilename);
            inputStreamToFile(ins, f);
            key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
        } catch (Exception e) {
            return ResponseResult.fail(e.getMessage());
        } finally {
            if (null != ins) {
                try {
                    ins.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (f.exists()) {// 刪除臨時文件
                f.delete();
            }
        }

        return ResponseResult.ok(key);
    }

    public static void inputStreamToFile(InputStream ins,File file) {
        try {
            OutputStream os = new FileOutputStream(file);
            int bytesRead = 0;
            byte[] buffer = new byte[8192];
            while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
                os.write(buffer, 0, bytesRead);
            }
            os.close();
            ins.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
        key = Uuid.getUuid() + "-" + key;
        OSSUtil.txOssUpload(inputStream, key, contentType);
        try {
            if (null != inputStream) {
                inputStream.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return key;
    }

    public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
        ObjectMetadata objectMetadata = new ObjectMetadata();
        try{
            int length = inputStream.available();
            objectMetadata.setContentLength(length);
        }catch (Exception e){
            logger.info(e.getMessage());
        }
        objectMetadata.setContentType(contentType);
        cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
    }

7、下載文件

    /**
     * 騰訊雲文件下載
     * @param response
     * @param id
     * @return
     */
    @RequestMapping("/txOssDownload")
    public Object txOssDownload(HttpServletResponse response, String id) {
        COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
        String contentType = getContentType(id);
        FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
        return null;
    }

    public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
        FileOutputStream fos = null;
        response.reset();
        OutputStream os = null;
        try {
            response.setContentType(contentType + "; charset=utf-8");
            if(!contentType.equals(PlConstans.FileContentType.image)){
                try {
                    response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
                } catch (UnsupportedEncodingException e) {
                    response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
                    logger.error("encoding file name failed", e);
                }
            }

            os = response.getOutputStream();

            byte[] b = new byte[1024 * 1024];
            int len;
            while ((len = fileStream.read(b)) > 0) {
                os.write(b, 0, len);
                os.flush();
                try {
                    if(fos != null) {
                        fos.write(b, 0, len);
                        fos.flush();
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
        } catch (IOException e) {
            IOUtils.closeQuietly(fos);
            fos = null;
        } finally {
            IOUtils.closeQuietly(os);
            IOUtils.closeQuietly(fileStream);
            if(fos != null) {
                IOUtils.closeQuietly(fos);
            }
        }
    }

8、讀取網路文件流

    /**
     * 讀取網路文件流
     * @param url
     * @return
     */
    public static InputStream readFileFromURL(String url) {
        if (StringUtils.isBlank(url)) {
            return null;
        }

        HttpClient httpClient = new DefaultHttpClient();
        HttpGet methodGet = new HttpGet(url);
        try {
            HttpResponse response = httpClient.execute(methodGet);
            if (response.getStatusLine().getStatusCode() == 200) {
                HttpEntity entity = response.getEntity();
                return entity.getContent();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

9、ExcelUtil

    /**
     * 讀excel
     * @param inputStream 文件輸入流
     * @return list集合
     */
    public static List<Object> read(InputStream inputStream) {
        return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
    }

    /**
     * 寫excel
     * @param data list數據
     * @param clazz
     * @param saveFilePath 文件保存路徑
     * @throws IOException
     */
    public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
        File tempFile = new File(saveFilePath);
        OutputStream out = new FileOutputStream(tempFile);
        ExcelWriter writer = EasyExcelFactory.getWriter(out);
        Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
        writer.write(data, sheet);
        writer.finish();
        out.close();
    }

說明: 至此, 整個流程算是完整了, 下麵將其他知識點代碼也貼出來參考

七、其他

1、@LoginRequired註解

/**
 * 在需要登錄驗證的Controller的方法上使用此註解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2、MyControllerAdvice

@ControllerAdvice
public class MyControllerAdvice {

    @ResponseBody
    @ExceptionHandler(TokenValidationException.class)
    public JsonResponse tokenValidationExceptionHandler() {
        return JsonResponse.loginInvalid();
    }

    @ResponseBody
    @ExceptionHandler(ServiceException.class)
    public JsonResponse serviceExceptionHandler(ServiceException se) {
        return JsonResponse.fail(se.getMsg());
    }

    @ResponseBody
    @ExceptionHandler(Exception.class)
    public JsonResponse exceptionHandler(Exception e) {
        e.printStackTrace();
        return JsonResponse.fail(e.getMessage());
    }

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

    private static final String CURRENT_USER = "user";

    @Autowired
    private UserService userService;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        // 如果不是映射到方法直接通過
        if (!(handler instanceof HandlerMethod)) {
            return true;
        }
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();

        // 判斷介面是否有@LoginRequired註解, 有則需要登錄
        LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
        if (methodAnnotation != null) {
            // 驗證token
            Integer userId = JwtUtil.verifyToken(request);
            PLUser plUser = userService.selectByPrimaryKey(userId);
            if (null == plUser) {
                throw new RuntimeException("用戶不存在,請重新登錄");
            }
            request.setAttribute(CURRENT_USER, plUser);
            return true;
        }
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }

    @Override
    public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
    }
}

4、JwtUtil

    public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
    public static final String SECRET = "pl_token_secret";
    public static final String HEADER = "token";
    public static final String USER_ID = "userId";

    /**
     * 根據userId生成token
     * @param userId
     * @return
     */
    public static String generateToken(String userId) {
        HashMap<String, Object> map = new HashMap<>();
        map.put(USER_ID, userId);
        String jwt = Jwts.builder()
                .setClaims(map)
                .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
                .signWith(SignatureAlgorithm.HS512, SECRET)
                .compact();
        return jwt;
    }

    /**
     * 驗證token
     * @param request
     * @return 驗證通過返回userId
     */
    public static Integer verifyToken(HttpServletRequest request) {
        String token = request.getHeader(HEADER);
        if (token != null) {
            try {
                Map<String, Object> body = Jwts.parser()
                        .setSigningKey(SECRET)
                        .parseClaimsJws(token)
                        .getBody();

                for (Map.Entry entry : body.entrySet()) {
                    Object key = entry.getKey();
                    Object value = entry.getValue();
                    if (key.toString().equals(USER_ID)) {
                        return Integer.valueOf(value.toString());// userId
                    }
                }
                return null;
            } catch (Exception e) {
                logger.error(e.getMessage());
                throw new TokenValidationException("unauthorized");
            }
        } else {
            throw new TokenValidationException("missing token");
        }
    }

結語: OK, 搞定,睡了, 好睏


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一. 提前準備工作 1.Node.js環境 2.Windows10 3.npm(前端包管理工具) 4.webpack(前端資源載入/打包工具) 二. 開始安裝 1.。下載並安裝Node.js 下載地址:https://nodejs.org/en/download/ 2.在cmd中查看node.js是 ...
  • 基本實現: 解析GET參數: 解析POST: ...
  • 原生 JavaScript 實現掃雷 完整思路分析,代碼實現。 ...
  • 這是我第一次寫博客,請多指教! vector是一種向量容器,說白了就是可以改變大小的數組。 vector是一個模板類,如果直接這樣會報錯: 1 vector a; //報錯,因為要指定模板。 需要像這樣: 那麼,什麼是 模板 呢? 模板是C++支持參數化多態的工具,使用模板可以使用戶為類或者函數聲明 ...
  • 定義 在不改變原有對象的基礎之上,將功能附加到對象上 適用場景 詳解 在看到定義的時候,可能很多人會想,這不就是繼承嗎?的確很像,不過是比繼承更加有彈性的替代方案。就像原型模式和new之間的關係一樣,有區別,但是區別又不是特別大。裝飾者一個很重要的詞就是動態,他可以靈活的選擇要這個功能還是不要。在裝 ...
  • 策略模式 一 開發模擬鴨子游戲 已經有一個很成功的鴨子模擬游戲,裡面有各種鴨子,一邊游泳,一邊呱呱叫。該系統採用的標準OO技術,設計了一個鴨子超類,並讓各種鴨子繼承此超類。 實現如下:超類中定義了鴨子的各種行為,包括呱呱叫,游泳,外觀等。由於各種鴨子的外觀是不一樣的,display方法抽象出來,各個 ...
  • 一、概念: 變數是指記憶體中的一個存儲區域,該區域要有自己的名稱(變數名)、類型(數據類型),該區域的數據可以在同一數據類型的範圍內不斷變化值; 二、變數的使用註意事項: 1、Java中的變數必須聲明後才能進行使用。 2、變數的作用域:在一對{}中為有效區間。 3、需要進行初始化後才能使用變數。 三、 ...
  • 安裝rabbit後,啟動服務,瀏覽器打開控制台找不到。查百度說是要裝插件。翻了好幾篇都是互相抄,沒有能用到。 多翻了幾篇終於找到一個靠譜的。可以打開控制台了。記錄下: 首先要安裝Erlang語言支持,我用的是 安裝完Erlang後,需要配置環境變數 再配置path變數 安裝rabbit。安裝路徑不要 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...