Spring Boot + Spring Batch 實現批處理任務,保姆級教程!(場景實戰)

来源:https://www.cnblogs.com/javastack/archive/2023/08/24/17653257.html
-Advertisement-
Play Games

來源:blog.csdn.net/qq_35387940/article/details/108193473 ## **前言** 概念詞就不多說了,我簡單地介紹下 , spring batch 是一個 方便使用的 較健全的 批處理 框架。 為什麼說是方便使用的,因為這是 基於spring的一個框架, ...


來源:blog.csdn.net/qq_35387940/article/details/108193473

前言

概念詞就不多說了,我簡單地介紹下 , spring batch 是一個 方便使用的 較健全的 批處理 框架。

為什麼說是方便使用的,因為這是 基於spring的一個框架,接入簡單、易理解、流程分明。

為什麼說是較健全的, 因為它提供了往常我們在對大批量數據進行處理時需要考慮到的 日誌跟蹤、事務粒度調配、可控執行、失敗機制、重試機制、數據讀寫等。

正文

那麼回到文章,我們該篇文章將會帶來給大家的是什麼?(結合實例講解那是當然的)

從實現的業務場景來說,有以下兩個:

  1. 從 csv文件 讀取數據,進行業務處理再存儲
  2. 從 資料庫 讀取數據,進行業務處理再存儲

也就是平時經常遇到的數據清理或者數據過濾,又或者是數據遷移備份等等。大批量的數據,自己實現分批處理需要考慮的東西太多了,又不放心,那麼使用 Spring Batch 框架 是一個很好的選擇。

首先,在進入實例教程前,我們看看這次的實例里,我們使用springboot 整合spring batch 框架,要編碼的東西有什麼?

通過一張簡單的圖來瞭解:

可能大家看到這個圖,是不是多多少少想起來定時任務框架?確實有那麼點像,但是我必須在這告訴大家,這是一個批處理框架,不是一個schuedling 框架。但是前面提到它提供了可執行控制,也就是說,啥時候執行是可控的,那麼顯然就是自己可以進行擴展結合定時任務框架,實現你心中所想。

ok,回到主題,相信大家能從圖中簡單明瞭地看到我們這次實例,需要實現的東西有什麼了。所以我就不在對各個小組件進行大批量文字的描述了。

那麼我們事不宜遲,開始我們的實例教程。

首先準備一個資料庫,裡面建一張簡單的表,用於實例數據的寫入存儲或者說是讀取等等。

bloginfo表

相關建表sql語句:

CREATE TABLE `bloginfo`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `blogAuthor` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客作者標識',
  `blogUrl` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客鏈接',
  `blogTitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客標題',
  `blogItem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客欄目',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 89031 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

pom文件里的核心依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<!--  spring batch -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<!-- hibernate validator -->
<dependency>
    <groupId>org.hibernate</groupId>
    <artifactId>hibernate-validator</artifactId>
    <version>6.0.7.Final</version>
</dependency>
<!--  mybatis -->
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>2.0.0</version>
</dependency>
<!--  mysql -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>

<!-- druid數據源驅動 1.1.10解決springboot從1.0——2.0版本問題-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.1.18</version>
</dependency>

yml文件:

Spring Boot 基礎就不介紹了,推薦看這個實戰項目:

https://github.com/javastacks/spring-boot-best-practice

spring:
  batch:
    job:
#設置為 false -需要jobLaucher.run執行
      enabled: false
    initialize-schema: always
#    table-prefix: my-batch

  datasource:
    druid:
      username: root
      password: root
      url: jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
      driver-class-name: com.mysql.cj.jdbc.Driver
      initialSize: 5
      minIdle: 5
      maxActive: 20
      maxWait: 60000
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: SELECT 1 FROM DUAL
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxPoolPreparedStatementPerConnectionSize: 20
      useGlobalDataSourceStat: true
      connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
server:
  port: 8665

ps:這裡我們用到了druid資料庫連接池,其實有個小坑,後面文章會講到。

因為我們這次的實例最終數據處理完之後,是寫入資料庫存儲(當然你也可以輸出到文件等等)。

所以我們前面也建了一張表,pom文件裡面我們也整合的mybatis,那麼我們在整合spring batch 主要編碼前,我們先把這些關於資料庫打通用到的簡單過一下。

pojo 層

BlogInfo.java :

/**
 * @Author : JCccc
 * @Description :
 **/
public class BlogInfo {

    private Integer id;
    private String blogAuthor;
    private String blogUrl;
    private String blogTitle;
    private String blogItem;

    @Override
    public String toString() {
        return "BlogInfo{" +
                "id=" + id +
                ", blogAuthor='" + blogAuthor + '\'' +
                ", blogUrl='" + blogUrl + '\'' +
                ", blogTitle='" + blogTitle + '\'' +
                ", blogItem='" + blogItem + '\'' +
                '}';
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getBlogAuthor() {
        return blogAuthor;
    }

    public void setBlogAuthor(String blogAuthor) {
        this.blogAuthor = blogAuthor;
    }

    public String getBlogUrl() {
        return blogUrl;
    }

    public void setBlogUrl(String blogUrl) {
        this.blogUrl = blogUrl;
    }

    public String getBlogTitle() {
        return blogTitle;
    }

    public void setBlogTitle(String blogTitle) {
        this.blogTitle = blogTitle;
    }

    public String getBlogItem() {
        return blogItem;
    }

    public void setBlogItem(String blogItem) {
        this.blogItem = blogItem;
    }
}
mapper層

BlogMapper.java :

ps:可以看到這個實例我用的是註解的方式,哈哈為了省事,而且我還不寫servcie層和impl層,也是為了省事,因為該篇文章重點不在這些,所以這些不好的大家不要學。

import com.example.batchdemo.pojo.BlogInfo;
import org.apache.ibatis.annotations.*;
import java.util.List;
import java.util.Map;

/**
 * @Author : JCccc
 * @Description :
 **/
@Mapper
public interface BlogMapper {
    @Insert("INSERT INTO bloginfo ( blogAuthor, blogUrl, blogTitle, blogItem )   VALUES ( #{blogAuthor}, #{blogUrl},#{blogTitle},#{blogItem}) ")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    int insert(BlogInfo bloginfo);

    @Select("select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}")
     List<BlogInfo> queryInfoById(Map<String , Integer> map);

}

接下來 ,重頭戲,我們開始對前邊那張圖裡涉及到的各個小組件進行編碼。

首先創建一個 配置類, MyBatchConfig.java

從我起名來看,可以知道這基本就是咱們整合spring batch 涉及到的一些配置組件都會寫在這裡了。

首先我們按照咱們上面的圖來看,裡面包含內容有:

JobRepository job的註冊/存儲器
JobLauncher job的執行器
Job job任務,包含一個或多個Step
Step 包含(ItemReader、ItemProcessor和ItemWriter)
ItemReader 數據讀取器
ItemProcessor 數據處理器
ItemWriter 數據輸出器

首先,在MyBatchConfig類前加入註解:

@Configuration 用於告訴spring,咱們這個類是一個自定義配置類,裡面很多bean都需要載入到spring容器裡面

@EnableBatchProcessing 開啟批處理支持

然後開始往MyBatchConfig類里,編寫各個小組件。

JobRepository

寫在MyBatchConfig類里

/**
 * JobRepository定義:Job的註冊容器以及和資料庫打交道(事務管理等)
 * @param dataSource
 * @param transactionManager
 * @return
 * @throws Exception
 */
@Bean
public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDatabaseType("mysql");
    jobRepositoryFactoryBean.setTransactionManager(transactionManager);
    jobRepositoryFactoryBean.setDataSource(dataSource);
    return jobRepositoryFactoryBean.getObject();
}
JobLauncher

寫在MyBatchConfig類里

/**
 * jobLauncher定義:job的啟動器,綁定相關的jobRepository
 * @param dataSource
 * @param transactionManager
 * @return
 * @throws Exception
 */
@Bean
public SimpleJobLauncher myJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    // 設置jobRepository
    jobLauncher.setJobRepository(myJobRepository(dataSource, transactionManager));
    return jobLauncher;
}
Job

寫在MyBatchConfig類里

/**
 * 定義job
 * @param jobs
 * @param myStep
 * @return
 */
@Bean
public Job myJob(JobBuilderFactory jobs, Step myStep){
    return jobs.get("myJob")
            .incrementer(new RunIdIncrementer())
            .flow(myStep)
            .end()
            .listener(myJobListener())
            .build();
}

對於Job的運行,是可以配置監聽器的

JobListener

寫在MyBatchConfig類里

/**
 * 註冊job監聽器
 * @return
 */
@Bean
public MyJobListener myJobListener(){
    return new MyJobListener();
}

這是一個我們自己自定義的監聽器,所以是單獨創建的,MyJobListener.java

/**
 * @Author : JCccc
 * @Description :監聽Job執行情況,實現JobExecutorListener,且在batch配置類里,Job的Bean上綁定該監聽器
 **/

public class MyJobListener implements JobExecutionListener {

    private Logger logger = LoggerFactory.getLogger(MyJobListener.class);

    @Override
    public void beforeJob(JobExecution jobExecution) {
        logger.info("job 開始, id={}",jobExecution.getJobId());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        logger.info("job 結束, id={}",jobExecution.getJobId());
    }
}
Step(ItemReader ItemProcessor ItemWriter)

step裡面包含數據讀取器,數據處理器,數據輸出器三個小組件的的實現。

我們也是一個個拆解來進行編寫。

文章前邊說到,該篇實現的場景包含兩種,一種是從csv文件讀入大量數據進行處理,另一種是從資料庫表讀入大量數據進行處理。

從CSV文件讀取數據
ItemReader

寫在MyBatchConfig類里

/**
 * ItemReader定義:讀取文件數據+entirty實體類映射
 * @return
 */
@Bean
public ItemReader<BlogInfo> reader(){
    // 使用FlatFileItemReader去讀cvs文件,一行即一條數據
    FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>();
    // 設置文件處在路徑
    reader.setResource(new ClassPathResource("static/bloginfo.csv"));
    // entity與csv數據做映射
    reader.setLineMapper(new DefaultLineMapper<BlogInfo>() {
        {
            setLineTokenizer(new DelimitedLineTokenizer() {
                {
                    setNames(new String[]{"blogAuthor","blogUrl","blogTitle","blogItem"});
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() {
                {
                    setTargetType(BlogInfo.class);
                }
            });
        }
    });
    return reader;
}

簡單代碼解析:

對於數據讀取器 ItemReader ,我們給它安排了一個讀取監聽器,創建 MyReadListener.java

/**
 * @Author : JCccc
 * @Description :
 **/

public class MyReadListener implements ItemReadListener<BlogInfo> {

    private Logger logger = LoggerFactory.getLogger(MyReadListener.class);

    @Override
    public void beforeRead() {
    }

    @Override
    public void afterRead(BlogInfo item) {
    }

    @Override
    public void onReadError(Exception ex) {
        try {
            logger.info(format("%s%n", ex.getMessage()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
ItemProcessor

寫在MyBatchConfig類里

/**
 * 註冊ItemProcessor: 處理數據+校驗數據
 * @return
 */
@Bean
public ItemProcessor<BlogInfo, BlogInfo> processor(){
    MyItemProcessor myItemProcessor = new MyItemProcessor();
    // 設置校驗器
    myItemProcessor.setValidator(myBeanValidator());
    return myItemProcessor;
}

數據處理器,是我們自定義的,裡面主要是包含我們對數據處理的業務邏輯,並且我們設置了一些數據校驗器,我們這裡使用 JSR-303的Validator來作為校驗器。

校驗器

寫在MyBatchConfig類里

/**
 * 註冊校驗器
 * @return
 */
@Bean
public MyBeanValidator myBeanValidator(){
    return new MyBeanValidator<BlogInfo>();
}

創建MyItemProcessor.java

ps:裡面我的數據處理邏輯是,獲取出讀取數據裡面的每條數據的blogItem欄位,如果是springboot,那就對title欄位值進行替換。

其實也就是模擬一個簡單地數據處理場景。

import com.example.batchdemo.pojo.BlogInfo;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;

/**
 * @Author : JCccc
 * @Description :
 **/
public class MyItemProcessor extends ValidatingItemProcessor<BlogInfo> {
    @Override
    public BlogInfo process(BlogInfo item) throws ValidationException {
        /**
         * 需要執行super.process(item)才會調用自定義校驗器
         */
        super.process(item);
        /**
         * 對數據進行簡單的處理
         */
        if (item.getBlogItem().equals("springboot")) {
            item.setBlogTitle("springboot 系列還請看看我Jc");
        } else {
            item.setBlogTitle("未知系列");
        }
        return item;
    }
}

創建MyBeanValidator.java:

import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;

/**
 * @Author : JCccc
 * @Description :
 **/
public class MyBeanValidator<T> implements Validator<T>, InitializingBean {

    private javax.validation.Validator validator;

    @Override
    public void validate(T value) throws ValidationException {
        /**
         * 使用Validator的validate方法校驗數據
         */
        Set<ConstraintViolation<T>> constraintViolations =
                validator.validate(value);
        if (constraintViolations.size() > 0) {
            StringBuilder message = new StringBuilder();
            for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                message.append(constraintViolation.getMessage() + "\n");
            }
            throw new ValidationException(message.toString());
        }
    }

    /**
     * 使用JSR-303的Validator來校驗我們的數據,在此進行JSR-303的Validator的初始化
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ValidatorFactory validatorFactory =
                Validation.buildDefaultValidatorFactory();
        validator = validatorFactory.usingContext().getValidator();
    }

}

ps:其實該篇文章沒有使用這個數據校驗器,大家想使用的話,可以在實體類上添加一些校驗器的註解@NotNull @Max @Email等等。我偏向於直接在處理器裡面進行處理,想把關於數據處理的代碼都寫在一塊。

ItemWriter

寫在MyBatchConfig類里

/**
 * ItemWriter定義:指定datasource,設置批量插入sql語句,寫入資料庫
 * @param dataSource
 * @return
 */
@Bean
public ItemWriter<BlogInfo> writer(DataSource dataSource){
    // 使用jdbcBcatchItemWrite寫數據到資料庫中
    JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
    // 設置有參數的sql語句
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
    String sql = "insert into bloginfo "+" (blogAuthor,blogUrl,blogTitle,blogItem) "
            +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
    writer.setSql(sql);
    writer.setDataSource(dataSource);
    return writer;
}

簡單代碼解析:

同樣 對於數據讀取器 ItemWriter ,我們給它也安排了一個輸出監聽器,創建 MyWriteListener.java

import com.example.batchdemo.pojo.BlogInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
import static java.lang.String.format;

/**
 * @Author : JCccc
 * @Description :
 **/
public class MyWriteListener implements ItemWriteListener<BlogInfo> {
    private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);

    @Override
    public void beforeWrite(List<? extends BlogInfo> items) {
    }

    @Override
    public void afterWrite(List<? extends BlogInfo> items) {
    }

    @Override
    public void onWriteError(Exception exception, List<? extends BlogInfo> items) {
        try {
            logger.info(format("%s%n", exception.getMessage()));
            for (BlogInfo message : items) {
                logger.info(format("Failed writing BlogInfo : %s", message.toString()));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

ItemReaderItemProcessorItemWriter,這三個小組件到這裡,我們都實現了,那麼接下來就是把這三個小組件跟我們的step去綁定起來。

寫在MyBatchConfig類里

/**
 * step定義:
 * 包括
 * ItemReader 讀取
 * ItemProcessor  處理
 * ItemWriter 輸出
 * @param stepBuilderFactory
 * @param reader
 * @param writer
 * @param processor
 * @return
 */

@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> reader,
                 ItemWriter<BlogInfo> writer, ItemProcessor<BlogInfo, BlogInfo> processor){
    return stepBuilderFactory
            .get("myStep")
            .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機制(即每次讀取一條數據,再處理一條數據,累積到一定數量後再一次性交給writer進行寫入操作)
            .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
            .listener(new MyReadListener())
            .processor(processor)
            .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
            .listener(new MyWriteListener())
            .build();
}

這個Step,稍作講解。

前邊提到了,spring batch框架,提供了事務的控制,重啟,檢測跳過等等機制。

那麼,這些東西的實現,很多都在於這個step環節的設置。

首先看到我們代碼出現的第一個設置,chunk( 6500 ) ,Chunk的機制(即每次讀取一條數據,再處理一條數據,累積到一定數量後再一次性交給writer進行寫入操作。

沒錯,對於整個step環節,就是數據的讀取,處理最後到輸出。

這個chunk機制里,我們傳入的 6500,也就是是告訴它,讀取處理數據,累計達到 6500條進行一次批次處理,去執行寫入操作。

這個傳值,是根據具體業務而定,可以是500條一次,1000條一次,也可以是20條一次,50條一次。

通過一張簡單的小圖來幫助理解:

在我們大量數據處理,不管是讀取或者說是寫入,都肯定會涉及到一些未知或者已知因素導致某條數據失敗了。

那麼如果說咱們啥也不設置,失敗一條數據,那麼我們就當作整個失敗了?。顯然這個太不人性,所以spring batch 提供了 retry 和 skip 兩個設置(其實還有restart) ,通過這兩個設置來人性化地解決一些數據操作失敗場景。

retryLimit(3).retry(Exception.class)

沒錯,這個就是設置重試,當出現異常的時候,重試多少次。我們設置為3,也就是說當一條數據操作失敗,那我們會對這條數據進行重試3次,還是失敗就是 當做失敗了, 那麼我們如果有配置skip(推薦配置使用),那麼這個數據失敗記錄就會留到給 skip 來處理。

skip(Exception.class).skipLimit(2)

skip,跳過,也就是說我們如果設置3, 那麼就是可以容忍 3條數據的失敗。只有達到失敗數據達到3次,我們才中斷這個step。

對於失敗的數據,我們做了相關的監聽器以及異常信息記錄,供與後續手動補救。

那麼記下來我們開始去調用這個批處理job,我們通過介面去觸發這個批處理事件,新建一個Controller,TestController.java

/**
 * @Author : JCccc
 * @Description :
 **/
@RestController
public class TestController {
    @Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    Job myJob;

    @GetMapping("testJob")
    public  void testJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
     //    後置參數:使用JobParameters中綁定參數 addLong  addString 等方法
        JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
        jobLauncher.run(myJob, jobParameters);

    }
}

對了,我準備了一個csv文件 bloginfo.csv,裡面大概8萬多條數據,用來進行批處理測試:

這個文件的路徑跟我們的數據讀取器裡面讀取的路徑要一直,

目前我們資料庫是這個樣子,

接下來我們把我們的項目啟動起來,再看一眼資料庫,生成了一些batch用來跟蹤記錄job的一些數據表:

我們來調用一下testJob介面,

然後看下資料庫,可以看的數據全部都進行了相關的邏輯處理並插入到了資料庫:

到這裡,我們對Springboot 整合 spring batch 其實已經操作完畢了,也實現了從csv文件讀取數據處理存儲的業務場景。

從資料庫讀取數據

ps:前排提示使用druid有坑。後面會講到。

那麼接下來實現場景,從資料庫表內讀取數據進行處理輸出到新的表裡面。

那麼基於我們上邊的整合,我們已經實現了

JobRepository job的註冊/存儲器
JobLauncher job的執行器
Job job任務,包含一個或多個Step
Step 包含(ItemReader、ItemProcessor和ItemWriter)
ItemReader 數據讀取器
ItemProcessor 數據處理器
ItemWriter 數據輸出器
job 監聽器
reader 監聽器
writer 監聽器
process 數據校驗器

那麼對於我們新寫一個job完成 一個新的場景,我們需要全部重寫麽?

顯然沒必要,當然完全新寫一套也是可以的。

那麼該篇,對於一個新的也出場景,從csv文件讀取數據轉換到資料庫表讀取數據,我們重新新建的有:

  1. 數據讀取器: 原先使用的是 FlatFileItemReader ,我們現在改為使用 MyBatisCursorItemReader
  2. 數據處理器: 新的場景,業務為了好擴展,所以我們處理器最好也新建一個
  3. 數據輸出器: 新的場景,業務為了好擴展,所以我們數據輸出器最好也新建一個
  4. step的綁定設置: 新的場景,業務為了好擴展,所以我們step最好也新建一個
  5. Job: 當然是要重新寫一個了

其他我們照用原先的就行,JobRepository,JobLauncher以及各種監聽器啥的,暫且不重新建了。

新建MyItemProcessorNew.java

import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;

/**
 * @Author : JCccc
 * @Description :
 **/
public class MyItemProcessorNew extends ValidatingItemProcessor<BlogInfo> {
    @Override
    public BlogInfo process(BlogInfo item) throws ValidationException {
        /**
         * 需要執行super.process(item)才會調用自定義校驗器
         */
        super.process(item);
        /**
         * 對數據進行簡單的處理
         */
        Integer authorId= Integer.valueOf(item.getBlogAuthor());
        if (authorId<20000) {
            item.setBlogTitle("這是都是小於20000的數據");
        } else if (authorId>20000 && authorId<30000){
            item.setBlogTitle("這是都是小於30000但是大於20000的數據");
        }else {
            item.setBlogTitle("舊書不厭百回讀");
        }
        return item;
    }
}

然後其他重新定義的小組件,寫在MyBatchConfig類里:

/**
 * 定義job
 * @param jobs
 * @param stepNew
 * @return
 */
@Bean
public Job myJobNew(JobBuilderFactory jobs, Step stepNew){
    return jobs.get("myJobNew")
            .incrementer(new RunIdIncrementer())
            .flow(stepNew)
            .end()
            .listener(myJobListener())
            .build();

}

@Bean
public Step stepNew(StepBuilderFactory stepBuilderFactory, MyBatisCursorItemReader<BlogInfo> itemReaderNew,
                    ItemWriter<BlogInfo> writerNew, ItemProcessor<BlogInfo, BlogInfo> processorNew){
    return stepBuilderFactory
            .get("stepNew")
            .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機制(即每次讀取一條數據,再處理一條數據,累積到一定數量後再一次性交給writer進行寫入操作)
            .reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10)
            .listener(new MyReadListener())
            .processor(processorNew)
            .writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2)
            .listener(new MyWriteListener())
            .build();

}

@Bean
public ItemProcessor<BlogInfo, BlogInfo> processorNew(){
    MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew();
    // 設置校驗器
    csvItemProcessor.setValidator(myBeanValidator());
    return csvItemProcessor;
}

@Autowired
private SqlSessionFactory sqlSessionFactory;

@Bean
@StepScope
//Spring Batch提供了一個特殊的bean scope類(StepScope:作為一個自定義的Spring bean scope)。這個step scope的作用是連接batches的各個steps。這個機制允許配置在Spring的beans當steps開始時才實例化並且允許你為這個step指定配置和參數。
public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value("#{jobParameters[authorId]}") String authorId) {

        System.out.println("開始查詢資料庫");

        MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>();

        reader.setQueryId("com.example.batchdemo.mapper.BlogMapper.queryInfoById");

        reader.setSqlSessionFactory(sqlSessionFactory);
         Map<String , Object> map = new HashMap<>();

          map.put("authorId" , Integer.valueOf(authorId));
         reader.setParameterValues(map);
        return reader;
}

/**
 * ItemWriter定義:指定datasource,設置批量插入sql語句,寫入資料庫
 * @param dataSource
 * @return
 */
@Bean
public ItemWriter<BlogInfo> writerNew(DataSource dataSource){
    // 使用jdbcBcatchItemWrite寫數據到資料庫中
    JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
    // 設置有參數的sql語句
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
    String sql = "insert into bloginfonew "+" (blogAuthor,blogUrl,blogTitle,blogItem) "
            +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
    writer.setSql(sql);
    writer.setDataSource(dataSource);
    return writer;
}

代碼需要註意的點

數據讀取器 MyBatisCursorItemReader

對應的mapper方法:

數據處理器 MyItemProcessorNew:

數據輸出器,新插入到別的資料庫表去,特意這樣為了測試:

當然我們的資料庫為了測試這個場景,也是新建了一張表,bloginfonew 表。

接下來,我們新寫一個介面來執行新的這個job:

@Autowired
SimpleJobLauncher jobLauncher;

@Autowired
Job myJobNew;

@GetMapping("testJobNew")
public  void testJobNew(@RequestParam("authorId") String authorId) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

    JobParameters jobParametersNew = new JobParametersBuilder().addLong("timeNew", System.currentTimeMillis())
            .addString("authorId",authorId)
            .toJobParameters();
    jobLauncher.run(myJobNew,jobParametersNew);

}

ok,我們來調用一些這個介面:

看下控制台:

沒錯,這就是失敗的,原因是因為跟druid有關,報了一個資料庫功能不支持。這是在數據讀取的時候報的錯。

我初步測試認為是MyBatisCursorItemReader ,druid 資料庫連接池不支持。

那麼,我們只需要:

註釋掉druid連接池 jar依賴

yml里替換連接池配置

其實我們不配置其他連接池,springboot 2.X 版本已經為我們整合了預設的連接池 HikariCP 。

在Springboot2.X版本,資料庫的連接池官方推薦使用HikariCP

如果不是為了druid的那些後臺監控數據,sql分析等等,完全是優先使用HikariCP的。

官方的原話:

We preferHikariCPfor its performance and concurrency. If HikariCP is available, we always choose it.

翻譯:

我們更喜歡hikaricpf的性能和併發性。如果有HikariCP,我們總是選擇它。

所以我們就啥連接池也不配了,使用預設的HikariCP 連接池。

推薦一個開源免費的 Spring Boot 實戰項目:

https://github.com/javastacks/spring-boot-best-practice

當然你想配,也是可以的:

所以我們剔除掉druid鏈接池後,我們再來調用一下新介面:

可以看到,從資料庫獲取數據併進行批次處理寫入job是成功的:

新的表裡面插入的數據都進行了自己寫的邏輯處理:

好了,springboot 整合 spring batch 批處理框架, 就到此吧。

近期熱文推薦:

1.1,000+ 道 Java面試題及答案整理(2022最新版)

2.勁爆!Java 協程要來了。。。

3.Spring Boot 2.x 教程,太全了!

4.別再寫滿屏的爆爆爆炸類了,試試裝飾器模式,這才是優雅的方式!!

5.《Java開發手冊(嵩山版)》最新發佈,速速下載!

覺得不錯,別忘了隨手點贊+轉發哦!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 方法重載在Python中起著關鍵作用。方法有時接受零參數,有時接受一個或多個參數。 當我們以不同的方式調用同一個方法時,這就被稱為方法重載。Python不像其他語言那樣預設支持重載方法。 在Python中,兩個或多個方法不能有相同的名字,因為方法重載允許我們使同一個操作符具有不同的含義。讓我們詳細討 ...
  • # RUST 和 GO 如何管理它們的記憶體 ## Go 中的記憶體管理 Go 中的記憶體不會在緩存鍵被驅逐時立即釋放。 相反,垃圾收集器會經常運行以發現任何沒有引用的記憶體並釋放它。 換句話說,記憶體會一直掛起,直到垃圾收集器可以評估它是否真正不再使用,而不是在不再需要時立即釋放。 Go 必須付出的努力來找 ...
  • 安裝rabbitMQ的前提是安裝上erlang,所以從erlang安裝開始。 安裝erlang 1,先升級一下 $:sudo apt-get update 如果軟體源有問題 修改etc/apt/sources.list內為如下: # 國內源deb http://mirrors.aliyun.com/ ...
  • 給我一個CPU,給我一塊記憶體,我來執行一段代碼。 我要如何分配呢? ![](https://img2023.cnblogs.com/blog/3256961/202308/3256961-20230824111951962-1088592200.jpg) ` ` `new User();` 這裡有一 ...
  • 因為平時有多台設備要用,所以遠程桌面是我經常要使用的工具。 最近,正好看到一款不錯的遠程桌面軟體,馬上拿出來推薦給大家,如果有需要的可以看看。 ![](https://img2023.cnblogs.com/other/626506/202308/626506-20230824152205856-1 ...
  • 來源:blog.csdn.net/lvoelife/article/details/128092586 ## **1. 基本概念** 我們都使用過連接池,比如`C3P0,DBCP,hikari, Druid`,雖然HikariCP的速度稍快,但Druid能夠提供強大的監控和擴展功能,也是阿裡巴巴的開 ...
  • 在每個代碼範圍內使用足夠短和足夠長的名稱:例如迴圈計算器用一個字元就可以了,如i;條件和迴圈變數用一個單詞,方法名1-2個單詞,類名2-3個單詞,全局變數3-4個單片語成 為變數指定一些專門名稱,不要使用例如 "value", "equals", "data" 這樣的變數名 變數名要使用有意義的名稱 ...
  • 過程的實現離不開堆棧的應用,堆棧是一種後進先出`(LIFO)`的數據結構,最後壓入棧的值總是最先被彈出,而新數值在執行壓棧時總是被壓入到棧的最頂端,棧主要功能是暫時存放數據和地址,通常用來保護斷點和現場。棧是由`CPU`管理的線性記憶體數組,它使用兩個寄存器`(SS和ESP)`來保存棧的狀態,SS寄存... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...