Spring Batch是SpringSource和Accenture联合开发的,主要解决轻量级的大数据批处理。下在附上一个架构图,方便理解。
显然,在我们的开发过程中,我们主要观注ItemReader, ItemProcessor, ItemWriter, 数据的读写包括文件与数据库, 对于中间的环节ItemProcessor,也是至关重要的,也就是说:读来的数据的处理逻辑就在此,处理了之后再进入写的环节。当然我们可以重写ItemReader, ItemProcessor, ItemWriter.
好了,下面进入Spring Batch的示例环节。
1. Spring的相关配置文件:DB相关的属性文件database.properties,DB相关的配置文件applicationDatabase.xml(包括ProxoolDataSource, DataSourceTransactionManager), Spring Batch的基本配置文件applicationContext.xml, 业务相关的配置文件migrationSimpleJob.xml
database.properties
jdbc.connection.driverClassName=oracle.jdbc.driver.OracleDriver jdbc.connection.url=jdbc:oracle:thin:<username>/<password>@100.111.86.250:1521:<SID> jdbc.connection.username=<username> jdbc.connection.password=<password> proxool.houseKeepingTestSql=SELECT CURRENT_DATE proxool.prototypeCount=0 proxool.hourseKeepingSleepTime=30000 proxool.maximumActiveTime=300000 proxool.maximumConnectionLifetime=14400000 proxool.minimumConnectionCount=5 proxool.maximumConnectionCount=15 proxool.statistics=15s,10m,1d proxool.alias=pool_dbname proxool.simultaneous-build-throttle=50
applicationDatabase.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>conf/database.properties</value> </list> </property> </bean> <bean id="dataSource" class="org.logicalcobwebs.proxool.ProxoolDataSource"> <property name="driver" value="${jdbc.connection.driverClassName}" /> <property name="driverUrl" value="${jdbc.connection.url}" /> <property name="user" value="${jdbc.connection.username}" /> <property name="password" value="${jdbc.connection.password}" /> <property name="houseKeepingTestSql" value="${proxool.houseKeepingTestSql}" /> <property name="prototypeCount" value="${proxool.prototypeCount}" /> <property name="houseKeepingSleepTime" value="${proxool.hourseKeepingSleepTime}" /> <property name="maximumActiveTime" value="${proxool.maximumActiveTime}" /> <property name="maximumConnectionLifetime" value="${proxool.maximumConnectionLifetime}" /> <property name="minimumConnectionCount" value="${proxool.minimumConnectionCount}" /> <property name="maximumConnectionCount" value="${proxool.maximumConnectionCount}" /> <property name="statistics" value="${proxool.statistics}" /> <property name="alias" value="${proxool.alias}" /> <property name="simultaneousBuildThrottle" value="${proxool.simultaneous-build-throttle}" /> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"></property> </bean> <tx:advice id="transactionAdvice" transaction-manager="transactionManager"> <tx:attributes> <tx:method name="add*" propagation="REQUIRED" /> <tx:method name="get*" propagation="REQUIRED" /> <tx:method name="edit*" propagation="REQUIRED" /> <tx:method name="delete*" propagation="REQUIRED" /> <tx:method name="*" no-rollback-for="Throwable" read-only="true" /> </tx:attributes> </tx:advice> </beans>
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> <import resource="applicationDatabase.xml" /> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true"> <property name="jobRepository" ref="jobRepository" /> </bean> </beans>
migrationSimpleJob.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> <import resource="applicationContext.xml" /> <bean id="managedObjectMapper" class="com.ManagedObjectMapper" /> <bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step"> <property name="dataSource" ref="dataSource" /> <property name="sql" value="SELECT CO_GID, CO_OC_ID, CO_NAME, CO_DN FROM MANAGE_OBJECTS" /> <property name="rowMapper" ref="managedObjectMapper" /> </bean> <bean id="itemProcessor" scope="step" class="com.nokia.migration.nasda.processor.ManagedObjectProcessor" /> <bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"> <property name="marshaller" ref="managedObjectMarshaller" /> <property name="resource" value="file:export/cmData.xml" /> <property name="rootTagName" value="cmData" /> </bean> <bean id="managedObjectMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <property name="classesToBeBound" value="com.ManagedObject" /> </bean> <batch:job id="migrationSimpleJob" job-repository="jobRepository" parent="simpleJob"> <batch:step id="migrationStep"> <batch:tasklet transaction-manager="transactionManager"> <batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="2" /> </batch:tasklet> </batch:step> </batch:job> </beans>
2. Bean类,关联DB Table中的字段
package com.bean; import javax.xml.bind.annotation.XmlAccessOrder; import javax.xml.bind.annotation.XmlAccessorOrder; import javax.xml.bind.annotation.XmlRootElement; @XmlRootElement(name = "managedObject") @XmlAccessorOrder(XmlAccessOrder.UNDEFINED) public class ManagedObject { private long gid; private long cid; private String name; private String dn; public long getGid() { return this.gid; } public long getCid() { return this.cid; } public String getName() { return this.name; } public String getDn() { return this.dn; } public void setGid(long gid) { this.gid = gid; } public void setCid(long cid) { this.cid = cid; } public void setName(String name) { this.name = name; } public void setDn(String dn) { this.dn = dn; } @Override public String toString() { return this.getDn(); } }
3. Mapper类,用于关联DB Table和Bean类的纽带
package com.mapper; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; import com.nokia.migration.nasda.bean.ManagedObject; public class ManagedObjectMapper implements RowMapper<ManagedObject> { @Override public ManagedObject mapRow(ResultSet resultSet, int rowNum) throws SQLException { ManagedObject managedObject = new ManagedObject(); managedObject.setGid(resultSet.getLong("CO_GID")); managedObject.setCid(resultSet.getLong("CO_OC_ID")); managedObject.setName(resultSet.getString("CO_NAME")); managedObject.setDn(resultSet.getString("CO_DN")); return managedObject; } }
4. 重写ItemReader, ItemProcessor, ItemWriter, 在此只重写了ItemProcess作为示例
package com.processor; import org.springframework.batch.item.ItemProcessor; import com.nokia.migration.nasda.bean.ManagedObject; /** * @author shengshu * */ public class ManagedObjectProcessor implements ItemProcessor<ManagedObject, ManagedObject> { @Override public ManagedObject process(ManagedObject mo) throws Exception { System.out.println(mo.toString()); return mo; } }
5. 程序启动类
package com.nokia.migration.nasda; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; public class MigrationJobLauncher { public static void main(String[] args) { @SuppressWarnings("resource") ApplicationContext context = new FileSystemXmlApplicationContext("conf/migrationSimpleJob.xml"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("migrationSimpleJob"); try { JobExecution result = launcher.run(job, new JobParameters()); System.out.println(result.toString()); } catch (Exception e) { e.printStackTrace(); } } }
Eclipse环境相关
- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.nokia.migration</groupId> <artifactId>Migration-OSS5-NetAct8</artifactId> <version>1.0</version> <packaging>jar</packaging> <name>Migration-OSS5-NetAct8</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring.batch.version>3.0.3.RELEASE</spring.batch.version> <spring.version>4.1.4.RELEASE</spring.version> <log4j.version>1.2.17</log4j.version> <junit.version>4.11</junit.version> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis-spring</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.2.8</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc5</artifactId> <version>11.1.0.1</version> </dependency> <dependency> <groupId>org.logicalcobwebs</groupId> <artifactId>proxool</artifactId> <version>0.9.0RC3</version> </dependency> <dependency> <groupId>dom4j</groupId> <artifactId>dom4j</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> <version>${junit.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <finalName>Migration-OSS5-NetAct8</finalName> <shadedArtifactAttached>true</shadedArtifactAttached> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.nokia.migration.nasda.MigrationJobLauncher</mainClass> </transformer> </transformers> </configuration> </plugin> </plugins> </build> </project>
- create.sql
DECLARE TABLE_COUNT NUMBER; BEGIN SELECT COUNT(1) INTO TABLE_COUNT FROM USER_TABLES WHERE TABLE_NAME='MANAGE_OBJECTS'; IF TABLE_COUNT>0 THEN EXECUTE IMMEDIATE 'DROP TABLE MANAGE_OBJECTS'; END IF; EXECUTE IMMEDIATE 'CREATE TABLE MANAGE_OBJECTS( CO_GID NUMBER NOT NULL, CO_OC_ID NUMBER NOT NULL, CO_NAME VARCHAR2(80), CO_DN VARCHAR2(256), CONSTRAINT CO_GID_PK PRIMARY KEY (CO_GID) )'; END; COMMIT; --============================================================================== DECLARE MAXRECORD CONSTANT INT:=1000000; IRECORD INT :=1; BEGIN FOR IRECORD IN 1..MAXRECORD LOOP INSERT INTO MANAGE_OBJECTS ( CO_GID, CO_OC_ID, CO_NAME, CO_DN ) VALUES ( ROUND(DBMS_RANDOM.VALUE(520355000000000000,520355999999999999)), ROUND(DBMS_RANDOM.VALUE(16,9999)), DBMS_RANDOM.STRING('U', 4), 'PLMN-MIG/'||DBMS_RANDOM.STRING('U', 4)||'-'||ROUND(DBMS_RANDOM.VALUE(1,9999))||'/'||DBMS_RANDOM.STRING('U', 4)||'-'||ROUND(DBMS_RANDOM.VALUE(1,9999)) ); END LOOP; DBMS_OUTPUT.PUT_LINE('Insert Successfully'); COMMIT; END;
- 目录结构
时间: 2024-12-05 05:08:39