1、需求系统每日从某个固定的目录中读取csv文件,并在控制台上打印 。
2、解决方案要解决上述需求,可以使用的方法有很多,此处选择使用Spring Batch来实现 。
3、注意事项1、文件路径的获取此处简单处理,读取 JobParameters 中的日期,然后构建一个文件路径,并将文件路径放入到 ExecutionContext中 。此处为了简单,文件路径会在程序中写死,但是同时也会将文件路径存入到 ExecutionContext 中,并且在具体的某个Step中从ExecutionContext中获取路径 。
注意:
ExecutionContext中存入的数据虽然在各个Step中都可以获取到,但是不推荐存入比较大的数据到ExecutionContext中,因为这个对象的数据需要存入到数据库中 。
2、各个Step如果获取到ExecutionContext中的值
- 类上加入 @StepScope 注解
- 通过 @Value("#{jobExecutionContext['importPath']}") 来获取
@Bean@StepScopepublic FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {// 读取数据return new FlatFileItemReaderBuilder<Person>().name("read-csv-file").resource(new ClassPathResource(importPath)).delimited().delimiter(",").names("username", "age", "sex").fieldSetMApper(new RecordFieldSetMapper<>(Person.class)).build();}
解释:在程序实例化FlatFileItemReader的时候,此时是没有jobExecutionContext的,那么就会报错,如果加上@StepScope,此时就没有问题了 。@StepScope表示到达Step阶段才实例化这个Bean3、FlatFileItemReader使用注意当我们使用FlatFileItemReader来读取我们的csv文件时,此处需要返回 FlatFileItemReader类型,而不能直接返回ItemReader,否则可能出现如下错误 Reader must be open before it can be read
4、实现步骤1、导入依赖,配置1、导入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency></dependencies>
2、初始化SpringBatch数据库spring.datasource.username=rootspring.datasource.password=root@1993spring.datasource.url=jdbc:MySQL://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=falsespring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# 程序启动时,默认不执行jobspring.batch.job.enabled=falsespring.batch.jdbc.initialize-schema=always# 初始化spring-batch数据库脚本spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql
2、构建文件读取路径此处我的想法是,在JobExecutionListener中完成文件路径的获取,并将之放入到ExecutionContext,然后在各个Step中就可以获取到文件路径的值了 。/** * 在此监听器中,获取到具体的需要读取的文件路径,并保存到 ExecutionContext * * @author huan.fu * @date 2022/8/30 - 22:22 */@Slf4jpublic class AssemblyReadCsvPathListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {ExecutionContext executionContext = jobExecution.getExecutionContext();JobParameters jobParameters = jobExecution.getJobParameters();String importDate = jobParameters.getString("importDate");log.info("从 job parameter 中获取的 importDate 参数的值为:[{}]", importDate);String readCsvPath = "data/person.csv";log.info("根据日期组装需要读取的csv路径为:[{}],此处排除日期,直接写一个死的路径", readCsvPath);executionContext.putString("importPath", readCsvPath);}@Overridepublic void afterJob(JobExecution jobExecution) {}}
3、构建Tasklet,输出文件路径@Slf4j@Component@StepScopepublic class PrintImportFilePathTaskLet implements Tasklet {@Value("#{jobExecutionContext['importPath']}")private String importFilePath;@Value("#{jobParameters['importDate']}")private String importDate;@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {log.info("从job parameter 中获取到的 importDate:[{}],从 jobExecutionContext 中获取的 importPath:[{}]",importDate, importFilePath);return RepeatStatus.FINISHED;}}
需要注意的是,此类上加入了 @StepScope注解4、编写实体类
@AllArgsConstructor@Getter@ToStringpublic class Person {/*** 用户名*/private String username;/*** 年龄*/private Integer age;/*** 性别*/private String sex;}
5、编写Job配置@Configuration@AllArgsConstructor@Slf4jpublic class ImportPersonJobConfig {private final JobBuilderFactory jobBuilderFactory;private final StepBuilderFactory stepBuilderFactory;private final PrintImportFilePathTaskLet printImportFilePathTaskLet;private final ItemReader<Person> readCsvItemReader;@Beanpublic Job importPersonJob() {// 获取一个job builder, jobName可以是不存在的return jobBuilderFactory.get("import-person-job")// 添加job execution 监听器.listener(new AssemblyReadCsvPathListener())// 打印 job parameters 和 ExecutionContext 中的值.start(printParametersAndContextVariables())// 读取csv的数据并处理.next(handleCsvFileStep()).build();}/*** 读取数据* 注意:此处需要返回 FlatFileItemReader类型,而不要返回ItemReader* 否则可能报如下异常 Reader must be open before it can be read** @param importPath 文件路径* @return reader*/@Bean@StepScopepublic FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {// 读取数据return new FlatFileItemReaderBuilder<Person>().name("read-csv-file").resource(new ClassPathResource(importPath)).delimited().delimiter(",").names("username", "age", "sex").fieldSetMapper(new RecordFieldSetMapper<>(Person.class)).build();}@Beanpublic Step handleCsvFileStep() {// 每读取一条数据,交给这个处理ItemProcessor<Person, Person> processor = item -> {if (item.getAge() > 25) {log.info("用户[{}]的年龄:[{}>25]不处理", item.getUsername(), item.getAge());return null;}return item;};// 读取到了 chunk 大小的数据后,开始执行写入ItemWriter<Person> itemWriter = items -> {log.info("开始写入数据");for (Person item : items) {log.info("{}", item);}};return stepBuilderFactory.get("handle-csv-file")// 每读取2条数据,执行一次write,当每read一条数据后,都会执行process.<Person, Person>chunk(2)// 读取数据.reader(readCsvItemReader)// 读取一条数据就开始处理.processor(processor)// 当读取的数据的数量到达 chunk 时,调用该方法进行处理.writer(itemWriter).build();}/*** 打印 job parameters 和 ExecutionContext 中的值* <p>* TaskletStep是一个非常简单的接口,仅有一个方法——execute 。* TaskletStep会反复的调用这个方法直到获取一个RepeatStatus.FINISHED返回或者抛出一个异常 。* 所有的Tasklet调用都会包装在一个事物中 。** @return Step*/private Step printParametersAndContextVariables() {return stepBuilderFactory.get("print-context-params").tasklet(printImportFilePathTaskLet)// 当job重启时,如果达到了3此,则该step不在执行.startLimit(3)// 当job重启时,如果该step的是已经处理完成即COMPLETED状态时,下方给false表示该step不在重启,即不在执行.allowStartIfComplete(false)// 添加 step 监听.listener(new CustomStepExecutionListener()).build();}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 使用 JavaScript 实现无限滚动
- 蒸鱼豉油可以炒菜吗
- 怎样制作红油辣椒
- 如何使用健身器材
- 分享微信使用技巧,快来涨姿势啦
- 海外版抖音TikTok使用教程详解
- 减肥方法使用新的软呼啦圈
- 如何在 Vuejs 中使用 Supertokens 的预构建 UI
- 在Java 8及更高版本中使用Java流
- 热熔胶我们常用的几种实用的使用方法 热熔胶怎么用