欢迎访问 生活随笔!

凯发k8官方网

当前位置: 凯发k8官方网 > 前端技术 > javascript >内容正文

javascript

springbatch 自定义itemreader和可重新启动reader(十五) -凯发k8官方网

发布时间:2025/1/21 javascript 13 豆豆
凯发k8官方网 收集整理的这篇文章主要介绍了 springbatch 自定义itemreader和可重新启动reader(十五) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

文章目录

    • 一、自定义customitemreader
    • 二、job 监听器
    • 三、配置job
    • 四、改造customitemreader,发生异常批处理作业从停止的地方重新启动

前言:在一些业务场景中,可能现有的reader不符合我们的要求,springbatch提供自定义reader,实现itemreader接口,满足我们业务场景。

springbatch其它文章直通车:

  • springbatch读单个文件(flatfileitemreader)和写单个文件(flatfileitemwriter)(一)
  • springbatch顺序读取多文件(multiresourceitemreader)和顺序写文件(multiresourceitemwriter)(二)
  • springbatch读数据库(mybatispagingitemreader)(三)
  • springbatch读文件(flatfileitemreader)写据库(mybatisbatchitemwriter)(四)
  • springbatch 监听器之job监听器(jobexecutionlistener)和step监听器(stepexecutionlistener)(五)
  • springbatch 监听器之chunk监听器(chunklistener)和skip监听器(skiplistener)(六)
  • springbatch 多线程(taskexecutor)启动job详解 (七)
  • springbatch 配置并行启动job详解 (八)
  • springbatch 批处理分区(partitioner )分片(九)
  • springbatch tasklet实现和用法(十)
  • springbatch 读取json(jsonitemreader)用法(十一)
  • springbatch 写文件json(jsonfileitemwriter)用法(十二)
  • springbatch 读取xml文件(staxeventitemreader)用法(十三)
  • springbatch 写xml文件(staxeventitemwriter)用法(十四)

代码已上传github上面地址:git源码地址

一、自定义customitemreader

创建了一个简单的itemreader实现,它从提供的列表中读取数据。我们首先实现itemreader最基本的read方法

package com.sl.common;import org.springframework.batch.item.itemreader; import org.springframework.batch.item.nontransientresourceexception; import org.springframework.batch.item.parseexception; import org.springframework.batch.item.unexpectedinputexception;import java.util.list;/*** 自定义reader* @author shuliangzhao* @title: customitemreader* @projectname spring-boot-learn* @description: todo* @date 2019/9/21 14:49*/ public class customitemreader implements itemreader {private list list;public customitemreader(list list) {this.list = list;}@overridepublic t read() throws exception, unexpectedinputexception, parseexception, nontransientresourceexception {if (list.size() > 0 && !list.isempty()) {return (t)list.remove(0);}return null;} }

二、job 监听器

job执行之前加载数据供reader使用

package com.sl.listener;import com.sl.common.commonconstants; import org.springframework.batch.core.jobexecution; import org.springframework.batch.core.jobexecutionlistener; import org.springframework.stereotype.component;/*** @author shuliangzhao* @title: customjoblistener* @projectname spring-boot-learn* @description: todo* @date 2019/9/21 14:59*/ @component public class customjoblistener implements jobexecutionlistener {@overridepublic void beforejob(jobexecution jobexecution) {commonconstants.getlist().add("hello");commonconstants.getlist().add("springbatch");}@overridepublic void afterjob(jobexecution jobexecution) {} }

三、配置job

package com.sl.config;import com.sl.common.commonconstants; import com.sl.common.customitemreader; import com.sl.listener.customjoblistener; import com.sl.writer.customitemwriter; import org.springframework.batch.core.job; import org.springframework.batch.core.step; import org.springframework.batch.core.configuration.annotation.enablebatchprocessing; import org.springframework.batch.core.configuration.annotation.jobbuilderfactory; import org.springframework.batch.core.configuration.annotation.stepbuilderfactory; import org.springframework.batch.core.configuration.annotation.stepscope; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration;/*** 自定义重启reader* @author shuliangzhao* @title: customconfiguration* @projectname spring-boot-learn* @description: todo* @date 2019/9/21 14:55*/ @configuration @enablebatchprocessing public class customconfiguration {@autowiredprivate jobbuilderfactory jobbuilderfactory;@autowiredprivate stepbuilderfactory stepbuilderfactory;@autowiredprivate customjoblistener customjoblistener;@autowiredprivate customitemwriter customitemwriter;@beanpublic job customjob() {return jobbuilderfactory.get("customjob").listener(customjoblistener).start(customstep()).build();}@beanpublic step customstep() {return stepbuilderfactory.get("customstep").chunk(10).reader(customitemreader()).writer(customitemwriter).build();}@bean@stepscopepublic customitemreader customitemreader() {return new customitemreader(commonconstants.getlist());} }

敲黑板:如果这个job在执行过程中发生错误,是不能重启。接下来我们讲解怎么改进reader可以让我们的job在发生异常,批处理作业从停止的地方重新启动。

四、改造customitemreader,发生异常批处理作业从停止的地方重新启动

目前,如果处理被中断并重新开始,itemreader必须从头开始。在许多场景中,这实际上是有效的,但有时更可取的做法是,批处理作业从停止的地方重新启动。关键的区别通常是阅读器是有状态的还是无状态的。无状态读取器不需要担心可重启性,但是有状态读取器必须尝试在重启时重新构建其最后一个已知状态。出于这个原因,我们建议尽可能保持自定义读取器处于无状态,因此不必担心可重启性。

customitemreader还需要实现接口itemstream

package com.sl.common;import org.springframework.batch.item.*;import java.util.list;/*** 自定义reader* @author shuliangzhao* @title: customitemreader* @projectname spring-boot-learn* @description: todo* @date 2019/9/21 14:49*/ public class customrestaitemreader implements itemreader , itemstream {private list list;int currentindex = 0;private static final string current_index = "current.index";public customrestaitemreader(list list) {this.list = list;}@overridepublic t read() throws exception, unexpectedinputexception, parseexception, nontransientresourceexception {if (currentindex < list.size()) {return list.get(currentindex );}return null;}@overridepublic void open(executioncontext executioncontext) throws itemstreamexception {if(executioncontext.containskey(current_index)){currentindex = new long(executioncontext.getlong(current_index)).intvalue();}else{currentindex = 0;}}@overridepublic void update(executioncontext executioncontext) throws itemstreamexception {executioncontext.putlong(current_index, new long(currentindex).longvalue());}@overridepublic void close() throws itemstreamexception {} }

实现reader重启关键方法是open方法和update方法

总结

以上是凯发k8官方网为你收集整理的springbatch 自定义itemreader和可重新启动reader(十五)的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得凯发k8官方网网站内容还不错,欢迎将凯发k8官方网推荐给好友。

网站地图