欢迎访问 生活随笔!

凯发k8官方网

当前位置: 凯发k8官方网 > 前端技术 > javascript >内容正文

javascript

全网最详细springbatch批处理读取分区(paratition)文件讲解 -凯发k8官方网

发布时间:2025/1/21 javascript 21 豆豆
凯发k8官方网 收集整理的这篇文章主要介绍了 全网最详细springbatch批处理读取分区(paratition)文件讲解 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

文章目录

      • 一、分区step
        • 1、数据分区
        • 2、分区处理
      • 二、实现分区关键接口
        • 1、partitioner
        • 2、stepexecutionsplitter
        • 3、partitionhandler
      • 三、基本配置和属性说明
        • 1、基本配置
        • 2、属性说明
      • 四、文件分区
        • 1、定义分区文件partitioner
        • 2、定义文件读
        • 3、定义分区job配置
        • 4、定义processor
        • 4、定义writer
        • 4、定义step监听器
        • 6、运行job

写在前面: 我是「境里婆娑」。我还是从前那个少年,没有一丝丝改变,时间只不过是考验,种在心中信念丝毫未减,眼前这个少年,还是最初那张脸,面前再多艰险不退却。
写博客的目的就是分享给大家一起学习交流,如果您对 java感兴趣,可以关注我,我们一起学习。

前言:为什么要写这篇文章,在网上很难找到一篇关于springbatch批处理读取分区文件基于javabean配置的文章,因此我决定写一篇关于springbatch读取分区文件基于javabean配置的文章,希望这篇文章可以帮助新手的你或者你有一定经验的可以加深印象。

一、分区step

何为分区step:

通过将任务进行分区,不同的step处理不同任务数据达到提高job效率功能。

分区作业可以分区两个处理阶段,数据分区、分区处理;

1、数据分区

数据分区:根据特殊的规则,将数据进行合理分片,为不同的数据切片生成数据执行上下文execution context、作业执行器step execution。可以通过接口partitioner生成自定义分区逻辑,springbatch批处理框架默认对多文件实现multiresourcepartititoner;也可以自行扩展接口partitioner实现自定义分区逻辑。

2、分区处理

分区处理:通过数据分区后,不同的数据已经被分配到不同的作业执行器中,接下来需要交给分区处理器进行作业,分区处理器可以在本地或远程执行被划分的作业。接口partitionhandler定义了分区处理逻辑,springbatch批处理框架默认实现了本地分区处理taskexecutorpartitionhandler;也可以自行扩展接口partitionhandler来实现自定义分区逻辑。

分区作业逻辑结构图:

二、实现分区关键接口

实现分区关键接口有如下:partitionhandler、stepexecutionsplitter、partitioner。

1、partitioner

partitoner接口定义了如何根据给定的分区规则进行创建作业执行分区的上下文。

partitioner接口定义如下:

public interface partitioner {map<string, executioncontext> partition(int gridsize); }

gridsize含义:根据给定的gridsize大小进行执行上下文划分。

2、stepexecutionsplitter

stepexecutionsplitter接口定义了如何根据给定的分区规则进行创建作业执行分区的执行器。

stepexecutionsplitter接口定义如下

public interface stepexecutionsplitter {string getstepname();set<stepexecution> split(stepexecution stepexecution, int gridsize) throws jobexecutionexception; }

getstepname:获取当前定义的分区作业的名称。
split:根据给定的分区规则为每个分区生成对应的分区执行器。

3、partitionhandler

partitionhandler接口定义了分区处理的逻辑,根据给定的stepexecutionsplitter进行分区并执行,最后将执行的结果进行收集,反馈给前端。

partitionhandler接口定义如下

public interface partitionhandler {collection<stepexecution> handle(stepexecutionsplitter stepsplitter, stepexecution stepexecution) throws exception; }

三、基本配置和属性说明

上面两节基本知识已经介绍完毕,下面我们将讲一个例子来巩固之前知识。

1、基本配置

一个典型分区job配置

@beanpublic step partitionmastermultifilestep() {return stepbuilderfactory.get("partitionmastermultifilestep").partitioner(partitionslavemultifilestep().getname(),multiresourcepartitioner()).partitionhandler(multifilepartitionhandler()).build();}

2、属性说明

在配置分区step之前,我们先看下分区step的主要属性定义和元素定义

属性说明
step用于指定分区step名称
handler(属性)属性handler指定分区执行器,需要实现接口partitionhandler
handler(子元素)用于定义默认实现:taskexecutorpartitionhandler
task-executor生命使用的线程池
grid-size声明分区的hashmap的初始值大小

四、文件分区

springbatch框架提供了对文件分区的支持,实现类:multiresourcepartitioner提供了对文件分区的默认支持,根据文件名将不同文件处理进行分区,提升处理速度和效率。本文将按照此例子给出如何配置多文件分区实现。

读取文件如下:

本节实例由于文件多,我们对文件进行分区,然后将文件的内容写入db,逻辑示意图如下:

1、定义分区文件partitioner

定义文件分区,将不同的文件分配到不同的作业中,使用自定义mymultiresourcepartitioner分区。

自定义分区mymultiresourcepartitioner如下:

/*** @author shuliangzhao* @date 2020/12/4 23:14*/ public class mymultiresourcepartitioner implements partitioner {private static final string default_key_name = "filename";private static final string partition_key = "partition";private resource[] resources = new resource[0];private string keyname = default_key_name;public void setresources(resource[] resources) {this.resources = resources;}public void setkeyname(string keyname) {this.keyname = keyname;}@overridepublic map<string, executioncontext> partition(int gridsize) {map<string, executioncontext> map = new hashmap<string, executioncontext>(gridsize);int i = 0;for (resource resource : resources) {executioncontext context = new executioncontext();assert.state(resource.exists(), "resource does not exist: "resource);try {context.putstring(keyname, resource.geturi().getpath());}catch (ioexception e) {throw new illegalargumentexception("file could not be located for: "resource, e);}map.put(partition_key i, context);i;}return map;} }

属性keyname:用于指定作业上文中属性名字,作用是在不同的作业上下文中可以获取设置的对于属性值。可以在读写阶段通过@value("#{stepexecutioncontext[filename]}"方式获取。

2、定义文件读

配置好分区实现,需要在每个分区作业中读入不同文件,进而提供文件处理效率。

partitionmultifilereader 实现

public class partitionmultifilereader extends flatfileitemreader {public partitionmultifilereader(class clz,string filename) {setresource(new filesystemresource(filename.substring(1)));field[] declaredfields = clz.getdeclaredfields();list<string> list = new arraylist<>();for (field field:declaredfields) {list.add(field.getname());}string[] names = new string[list.size()];delimitedlinetokenizer delimitedlinetokenizer = new delimitedlinetokenizer();delimitedlinetokenizer.setdelimiter(",");delimitedlinetokenizer.setnames(list.toarray(names));defaultlinemapper defaultlinemapper = new defaultlinemapper();defaultlinemapper.setlinetokenizer(delimitedlinetokenizer);commonfieldsetmapper commonfieldsetmapper = new commonfieldsetmapper();commonfieldsetmapper.settargettype(clz);defaultlinemapper.setfieldsetmapper(commonfieldsetmapper);setlinemapper(defaultlinemapper);setname(clz.getsimplename());} }

3、定义分区job配置

基于javabean方式实现job配置

package com.sl.config; //包导入省略/*** @author shuliangzhao* @title: partitionfileconfiguration* @projectname spring-boot-learn* @description: todo* @date 2020/12/4 21:09*/ @configuration @enablebatchprocessing public class partitionmultifileconfiguration {@autowiredprivate jobbuilderfactory jobbuilderfactory;@autowiredprivate stepbuilderfactory stepbuilderfactory;@autowiredprivate partitonmultifileprocessor partitonmultifileprocessor;@autowiredprivate partitionmultifilewriter partitionmultifilewriter;@beanpublic job partitionmultifilejob() {return jobbuilderfactory.get("partitionmultifilejob").start(partitionmastermultifilestep()).build();}@beanpublic step partitionmastermultifilestep() {return stepbuilderfactory.get("partitionmastermultifilestep").partitioner(partitionslavemultifilestep().getname(),multiresourcepartitioner()).partitionhandler(multifilepartitionhandler()).build();}@beanpublic partitionhandler multifilepartitionhandler() {taskexecutorpartitionhandler handler = new taskexecutorpartitionhandler();handler.setgridsize(2);handler.setstep(partitionslavemultifilestep());handler.settaskexecutor(new simpleasynctaskexecutor());return handler;}@beanpublic step partitionslavemultifilestep() {return stepbuilderfactory.get("partitionslavemultifilestep").<creditbill,creditbill>chunk(1).reader(partitionmultifilereader(null)).processor(partitonmultifileprocessor).writer(partitionmultifilewriter).build();}@bean@stepscopepublic partitionmultifilereader partitionmultifilereader(@value("#{stepexecutioncontext[filename]}")string filename) {return new partitionmultifilereader(creditbill.class,filename);}@beanpublic mymultiresourcepartitioner multiresourcepartitioner() {mymultiresourcepartitioner multiresourcepartitioner = new mymultiresourcepartitioner();multiresourcepartitioner.setkeyname("filename");multiresourcepartitioner.setresources(getresource());return multiresourcepartitioner;}private resource[] getresource() {string filepath = "d:\\aplus\\bill\\";file file = new file(filepath);list<resource> resourcelist = new arraylist<>();if (file.isdirectory()) {string[] list = file.list();if (list != null) {for (string str : list) {string resource = file.getpath() "\\" str;filesystemresource filesystemresource = new filesystemresource(resource);resourcelist.add(filesystemresource);}}}resource[] resources = new resource[resourcelist.size()];return resourcelist.toarray(resources);}}

4、定义processor

定义processor

/*** @author shuliangzhao* @date 2020/12/4 22:11*/ @component @stepscope public class partitonmultifileprocessor implements itemprocessor<creditbill,creditbill> {@overridepublic creditbill process(creditbill item) throws exception {creditbill creditbill = new creditbill();creditbill.setacctid(item.getacctid());creditbill.setaddress(item.getaddress());creditbill.setamout(item.getamout());creditbill.setdate(item.getdate());creditbill.setname(item.getname());return creditbill;} }

4、定义writer

/*** @author shuliangzhao* @date 2020/12/4 22:29*/ @component @stepscope public class partitionmultifilewriter implements itemwriter<creditbill> {@autowiredprivate creditbillmapper creditbillmapper;@overridepublic void write(list<? extends creditbill> items) throws exception {if (items != null && items.size() > 0) {items.stream().foreach(item -> {creditbillmapper.insert(item);});}} }

4、定义step监听器

定义step监听器目的是在处理作业之前打印线程名字和读取文件名字

@component public class partitionsteplistener implements stepexecutionlistener {private static final logger logger = loggerfactory.getlogger(partitionsteplistener.class);@overridepublic void beforestep(stepexecution stepexecution) {logger.info("threadname={},stename={},filename={}",thread.currentthread().getname(),stepexecution.getstepname(),stepexecution.getexecutioncontext().getstring("filename"));}@overridepublic exitstatus afterstep(stepexecution stepexecution) {return null;} }

6、运行job

执行job查看结果,可以看出不同的文件有不同的线程来处理,并且被分配到不同的分区作业步中执行

2020-12-05 15:58:34.100 info 13208 --- [ctaskexecutor-1] com.sl.listener.partitionsteplistener : threadname=simpleasynctaskexecutor-1,stename=partitionslavemultifilestep:partition1,filename=/d:/aplus/bill/bill2.csv 2020-12-05 15:58:34.114 info 13208 --- [ctaskexecutor-3] com.sl.listener.partitionsteplistener : threadname=simpleasynctaskexecutor-3,stename=partitionslavemultifilestep:partition0,filename=/d:/aplus/bill/bill1.csv 2020-12-05 15:58:34.122 info 13208 --- [ctaskexecutor-2] com.sl.listener.partitionsteplistener : threadname=simpleasynctaskexecutor-2,stename=partitionslavemultifilestep:partition2,filename=/d:/aplus/bill/bill3.csv

至此,我们完成了对文件分区的处理。
如果想更详细查看以上所有代码请移步到github:文件分区详细代码

总结

以上是凯发k8官方网为你收集整理的全网最详细springbatch批处理读取分区(paratition)文件讲解的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得凯发k8官方网网站内容还不错,欢迎将凯发k8官方网推荐给好友。

网站地图