Spring Batch Read from DB and Write to File

     Spring Batch是SpringSource和Accenture联合开发的,主要解决轻量级的大数据批处理。下在附上一个架构图,方便理解。

    显然,在我们的开发过程中,我们主要观注ItemReader, ItemProcessor, ItemWriter, 数据的读写包括文件与数据库, 对于中间的环节ItemProcessor,也是至关重要的,也就是说:读来的数据的处理逻辑就在此,处理了之后再进入写的环节。当然我们可以重写ItemReader, ItemProcessor, ItemWriter.

    好了,下面进入Spring Batch的示例环节。

1. Spring的相关配置文件:DB相关的属性文件database.properties,DB相关的配置文件applicationDatabase.xml(包括ProxoolDataSource, DataSourceTransactionManager), Spring Batch的基本配置文件applicationContext.xml, 业务相关的配置文件migrationSimpleJob.xml

database.properties

jdbc.connection.driverClassName=oracle.jdbc.driver.OracleDriver
jdbc.connection.url=jdbc:oracle:thin:<username>/<password>@100.111.86.250:1521:<SID>
jdbc.connection.username=<username>
jdbc.connection.password=<password>
proxool.houseKeepingTestSql=SELECT CURRENT_DATE
proxool.prototypeCount=0
proxool.hourseKeepingSleepTime=30000
proxool.maximumActiveTime=300000
proxool.maximumConnectionLifetime=14400000
proxool.minimumConnectionCount=5
proxool.maximumConnectionCount=15
proxool.statistics=15s,10m,1d
proxool.alias=pool_dbname
proxool.simultaneous-build-throttle=50

applicationDatabase.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:p="http://www.springframework.org/schema/p"
	xsi:schemaLocation="
			http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context.xsd
			http://www.springframework.org/schema/aop
	 		http://www.springframework.org/schema/aop/spring-aop.xsd
	 		http://www.springframework.org/schema/mvc
			http://www.springframework.org/schema/mvc/spring-mvc.xsd
			http://www.springframework.org/schema/tx
	 		http://www.springframework.org/schema/tx/spring-tx.xsd
	 		http://www.springframework.org/schema/batch
			http://www.springframework.org/schema/batch/spring-batch.xsd">

	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>conf/database.properties</value>
			</list>
		</property>
	</bean>

	<bean id="dataSource" class="org.logicalcobwebs.proxool.ProxoolDataSource">
		<property name="driver" value="${jdbc.connection.driverClassName}" />
		<property name="driverUrl" value="${jdbc.connection.url}" />
		<property name="user" value="${jdbc.connection.username}" />
		<property name="password" value="${jdbc.connection.password}" />

		<property name="houseKeepingTestSql" value="${proxool.houseKeepingTestSql}" />
		<property name="prototypeCount" value="${proxool.prototypeCount}" />
		<property name="houseKeepingSleepTime" value="${proxool.hourseKeepingSleepTime}" />
		<property name="maximumActiveTime" value="${proxool.maximumActiveTime}" />
		<property name="maximumConnectionLifetime" value="${proxool.maximumConnectionLifetime}" />
		<property name="minimumConnectionCount" value="${proxool.minimumConnectionCount}" />
		<property name="maximumConnectionCount" value="${proxool.maximumConnectionCount}" />
		<property name="statistics" value="${proxool.statistics}" />
		<property name="alias" value="${proxool.alias}" />
		<property name="simultaneousBuildThrottle" value="${proxool.simultaneous-build-throttle}" />
	</bean>

	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource"></property>
	</bean>

	<tx:advice id="transactionAdvice" transaction-manager="transactionManager">
		<tx:attributes>
			<tx:method name="add*" propagation="REQUIRED" />
			<tx:method name="get*" propagation="REQUIRED" />
			<tx:method name="edit*" propagation="REQUIRED" />
			<tx:method name="delete*" propagation="REQUIRED" />
			<tx:method name="*" no-rollback-for="Throwable" read-only="true" />
		</tx:attributes>
	</tx:advice>

</beans>

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:p="http://www.springframework.org/schema/p"
	xsi:schemaLocation="
			http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context.xsd
			http://www.springframework.org/schema/aop
	 		http://www.springframework.org/schema/aop/spring-aop.xsd
	 		http://www.springframework.org/schema/mvc
			http://www.springframework.org/schema/mvc/spring-mvc.xsd
			http://www.springframework.org/schema/tx
	 		http://www.springframework.org/schema/tx/spring-tx.xsd
	 		http://www.springframework.org/schema/batch
			http://www.springframework.org/schema/batch/spring-batch.xsd">

	<import resource="applicationDatabase.xml" />

	<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

</beans>

migrationSimpleJob.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:p="http://www.springframework.org/schema/p"
	xsi:schemaLocation="
			http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context.xsd
			http://www.springframework.org/schema/aop
	 		http://www.springframework.org/schema/aop/spring-aop.xsd
	 		http://www.springframework.org/schema/mvc
			http://www.springframework.org/schema/mvc/spring-mvc.xsd
			http://www.springframework.org/schema/tx
	 		http://www.springframework.org/schema/tx/spring-tx.xsd
	 		http://www.springframework.org/schema/batch
			http://www.springframework.org/schema/batch/spring-batch.xsd">

	<import resource="applicationContext.xml" />

	<bean id="managedObjectMapper" class="com.ManagedObjectMapper" />

	<bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
		<property name="dataSource" ref="dataSource" />
		<property name="sql" value="SELECT CO_GID, CO_OC_ID, CO_NAME, CO_DN FROM MANAGE_OBJECTS" />
		<property name="rowMapper" ref="managedObjectMapper" />
	</bean>

	<bean id="itemProcessor" scope="step" class="com.nokia.migration.nasda.processor.ManagedObjectProcessor" />

	<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
		<property name="marshaller" ref="managedObjectMarshaller" />
		<property name="resource" value="file:export/cmData.xml" />
		<property name="rootTagName" value="cmData" />
	</bean>

	<bean id="managedObjectMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
		<property name="classesToBeBound" value="com.ManagedObject" />
	</bean>

	<batch:job id="migrationSimpleJob" job-repository="jobRepository" parent="simpleJob">
		<batch:step id="migrationStep">
			<batch:tasklet transaction-manager="transactionManager">
				<batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
</beans>

2. Bean类,关联DB Table中的字段

package com.bean;

import javax.xml.bind.annotation.XmlAccessOrder;
import javax.xml.bind.annotation.XmlAccessorOrder;
import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement(name = "managedObject")
@XmlAccessorOrder(XmlAccessOrder.UNDEFINED)
public class ManagedObject {
	private long gid;
	private long cid;
	private String name;
	private String dn;

	public long getGid() {
		return this.gid;
	}

	public long getCid() {
		return this.cid;
	}

	public String getName() {
		return this.name;
	}

	public String getDn() {
		return this.dn;
	}

	public void setGid(long gid) {
		this.gid = gid;
	}

	public void setCid(long cid) {
		this.cid = cid;
	}

	public void setName(String name) {
		this.name = name;
	}

	public void setDn(String dn) {
		this.dn = dn;
	}

	@Override
	public String toString() {
		return this.getDn();
	}
}

3. Mapper类,用于关联DB Table和Bean类的纽带

package com.mapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import com.nokia.migration.nasda.bean.ManagedObject;

public class ManagedObjectMapper implements RowMapper<ManagedObject> {

	@Override
	public ManagedObject mapRow(ResultSet resultSet, int rowNum) throws SQLException {
		ManagedObject managedObject = new ManagedObject();

		managedObject.setGid(resultSet.getLong("CO_GID"));
		managedObject.setCid(resultSet.getLong("CO_OC_ID"));
		managedObject.setName(resultSet.getString("CO_NAME"));
		managedObject.setDn(resultSet.getString("CO_DN"));

		return managedObject;
	}

}

4. 重写ItemReader, ItemProcessor, ItemWriter, 在此只重写了ItemProcess作为示例

package com.processor;

import org.springframework.batch.item.ItemProcessor;

import com.nokia.migration.nasda.bean.ManagedObject;

/**
 * @author shengshu
 *
 */
public class ManagedObjectProcessor implements ItemProcessor<ManagedObject, ManagedObject> {

	@Override
	public ManagedObject process(ManagedObject mo) throws Exception {
		System.out.println(mo.toString());

		return mo;
	}

}

5. 程序启动类

package com.nokia.migration.nasda;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

public class MigrationJobLauncher {

	public static void main(String[] args) {
		@SuppressWarnings("resource")
		ApplicationContext context = new FileSystemXmlApplicationContext("conf/migrationSimpleJob.xml");

		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		Job job = (Job) context.getBean("migrationSimpleJob");

		try {
			JobExecution result = launcher.run(job, new JobParameters());
			System.out.println(result.toString());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Eclipse环境相关

  • pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.nokia.migration</groupId>
	<artifactId>Migration-OSS5-NetAct8</artifactId>
	<version>1.0</version>
	<packaging>jar</packaging>

	<name>Migration-OSS5-NetAct8</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<spring.batch.version>3.0.3.RELEASE</spring.batch.version>
		<spring.version>4.1.4.RELEASE</spring.version>
		<log4j.version>1.2.17</log4j.version>
		<junit.version>4.11</junit.version>
		<maven.compiler.source>1.7</maven.compiler.source>
		<maven.compiler.target>1.7</maven.compiler.target>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-aop</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.mybatis</groupId>
			<artifactId>mybatis-spring</artifactId>
			<version>1.2.2</version>
		</dependency>
		<dependency>
			<groupId>org.mybatis</groupId>
			<artifactId>mybatis</artifactId>
			<version>3.2.8</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-infrastructure</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
			<version>${spring.version}</version>
		</dependency>

		<dependency>
			<groupId>com.oracle</groupId>
			<artifactId>ojdbc5</artifactId>
			<version>11.1.0.1</version>
		</dependency>

		<dependency>
			<groupId>org.logicalcobwebs</groupId>
			<artifactId>proxool</artifactId>
			<version>0.9.0RC3</version>
		</dependency>

		<dependency>
			<groupId>dom4j</groupId>
			<artifactId>dom4j</artifactId>
			<version>1.6.1</version>
		</dependency>

		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.1</version>
		</dependency>
		<dependency>
			<groupId>commons-io</groupId>
			<artifactId>commons-io</artifactId>
			<version>2.4</version>
		</dependency>

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
			<version>${junit.version}</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
					</execution>
				</executions>
				<configuration>
					<finalName>Migration-OSS5-NetAct8</finalName>
					<shadedArtifactAttached>true</shadedArtifactAttached>
					<transformers>
						<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
							<mainClass>com.nokia.migration.nasda.MigrationJobLauncher</mainClass>
						</transformer>
					</transformers>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

  • create.sql
DECLARE TABLE_COUNT NUMBER;

BEGIN
  SELECT COUNT(1) INTO TABLE_COUNT FROM USER_TABLES WHERE TABLE_NAME='MANAGE_OBJECTS';

  IF TABLE_COUNT>0 THEN
    EXECUTE IMMEDIATE 'DROP TABLE MANAGE_OBJECTS';
  END IF;

  EXECUTE IMMEDIATE 'CREATE TABLE MANAGE_OBJECTS(
  CO_GID NUMBER NOT NULL,
  CO_OC_ID NUMBER NOT NULL,
  CO_NAME VARCHAR2(80),
  CO_DN VARCHAR2(256),
  CONSTRAINT CO_GID_PK PRIMARY KEY (CO_GID)
  )';
END; 

COMMIT;

--==============================================================================

DECLARE
  MAXRECORD CONSTANT INT:=1000000;
  IRECORD         INT         :=1;
BEGIN
  FOR IRECORD IN 1..MAXRECORD
  LOOP
    INSERT
    INTO MANAGE_OBJECTS
      (
        CO_GID,
        CO_OC_ID,
        CO_NAME,
        CO_DN
      )
      VALUES
      (
        ROUND(DBMS_RANDOM.VALUE(520355000000000000,520355999999999999)),
        ROUND(DBMS_RANDOM.VALUE(16,9999)),
        DBMS_RANDOM.STRING('U', 4),
        'PLMN-MIG/'||DBMS_RANDOM.STRING('U', 4)||'-'||ROUND(DBMS_RANDOM.VALUE(1,9999))||'/'||DBMS_RANDOM.STRING('U', 4)||'-'||ROUND(DBMS_RANDOM.VALUE(1,9999))
      );
  END LOOP;
  DBMS_OUTPUT.PUT_LINE('Insert Successfully');
  COMMIT;
END;

  • 目录结构

时间: 2024-12-05 05:08:39

Spring Batch Read from DB and Write to File的相关文章

Spring Batch 专题

  如今微服务架构讨论的如火如荼.但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易.在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理.针对OLTP,业界有大量的开源框架.优秀的架构设计给予支撑:但批处理领域的框架确凤毛麟角.是时候和我们一起来了解下批处理的世界哪些优秀的框架和设计了,今天我将以Spring Batch为例,和大家一起探秘批处理的世界.   初识批处理典型场景 探秘领域模型及关键架构 实现作业健壮性与扩展性 批处理框架的不足与增强   批处理典型业务场景

一篇文章全面解析大数据批处理框架Spring Batch

如今微服务架构讨论的如火如荼.但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易.在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理.针对OLTP,业界有大量的开源框架.优秀的架构设计给予支撑;但批处理领域的框架确凤毛麟角.是时候和我们一起来了解下批处理的世界哪些优秀的框架和设计了,今天我将以Spring Batch为例,和大家一起探秘批处理的世界. 如今微服务架构讨论的如火如荼.但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易.在诸如银行的金融机构中,每天有

Spring Batch 2将支持工作划分和基于注解的配置

这一版本的新特性可以分为四类:对Java 5的支持.非顺序执行.增强的可伸缩性以及注解. 对Java 5的支持: Spring Batch 2.0版支持Java 5泛型和参数化类型,以便可以在编译时检查类型安全性.例如,ItemReader接口现在有了一个类型安全的read方法. 非顺序执行: 这其实包括3个新特性--条件.暂停和并行执行.凭借这些特性,各步骤可以按非线性顺序执行.即使工作(Job)中的某个步骤(step)失败,整个工作也依然能够完成.有条件执行(Conditional exec

Spring Batch

背景介绍 一直以来,open source的项目和社区为企业级应用提供了大量的web-based和SOA messaging-based的框架,而对批处理(batch processing)关注的较少,因为缺少标准和可重用的批处理框架,导致企业内部出现了大量的一次性的(one-off)和闭门造车(in-house)的批处理解决方案(1688的task就是典型代表). 鉴于此,SpringSource和埃森哲公司联手致力于改变此状况,结合埃森哲在批处理上的架构经验和SpringSource的技术优

spring batch 报错,求大神赐教,急急急急。。。。。。

问题描述 spring batch 报错,求大神赐教,急急急急...... 2014-2-22 13:13:42 org.springframework.context.support.AbstractApplicationContext prepareRefresh 信息: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@13d93f4: display name [org.spring

spring batch 2.0以上版 本 写数据库 遇到列超位 自动插入下一条

问题描述 spring batch 2.0以上版 本 写数据库 遇到列超位 自动插入下一条 我用skip设置跳过这个错误,但是由于没有rollback,又出现了主键制约的问题,请问怎么 处理 这个问题 或者怎么设置事务的rollback 解决方案 问题解决了. org.springframework.jdbc.datasource.DataSourceTransactionManager,用这个来管理事务, 而不是org.springframework.batch.support.transac

spring batch 说step取不到 求大神!!!!!!!!!!

问题描述 spring batch 说step取不到 求大神!!!!!!!!!! 错误信息:信息: Loading XML bean definitions from class path resource [message_job.xml]Exception in thread ""main"" org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configurati

Spring Batch在大型企业中的最佳实践

在大型企业中,由于业务复杂.数据量大.数据格式不同.数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理.而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理.这样的过程就是"批处理". 批处理应用通常有以下特点: 数据量大,从数万到数百万甚至上亿不等: 整个过程全部自动化,并预留一定接口进行自定义配置: 这样的应用通常是周期性运行,比如按日.周.月运行: 对数据处理的准确性要求高,并且需要容错机制.回滚机制.完善的日志监控等. 什么是Spring batch Sprin

介绍Spring Batch的Step Flow以及并发处理的重要特性

在实际应用中,我们的 Job 可能必须要包含多个 Step,为了提高性能,我们可能需要考虑 Job 的并发问题.Spring Batch 在这些方面又提供了哪些好的特性呢?让我们继续. Step Flow 通过前文我们已经知道,Step 是一个独立的.顺序的处理步骤,包含完整的输入.处理以及输出.但是在http://www.aliyun.com/zixun/aggregation/13760.html">企业应用中,我们面对的更多情况是多个步骤按照一定的顺序进行处理.因此如何维护步骤之间的