spring integration同步数据库数据

 需求为:当客户已有系统的数据被同步到我方数据库后,若再有新数据,只同步新数据到我方数据库。

解决:因为客户的业务表是不能变动的,我方在客户数据库中新建一状态表,记录哪些数据被更新过。

当客户业务表有新数据插入时,用触发器将新数据id插入到状态表。

 

为方便实例:业务表pp,状态表status

结构为:

pp:

CREATE TABLE `pp` (
  `name` varchar(255) default NULL,
  `address` varchar(255) default NULL,
  `id` int(11) NOT NULL auto_increment,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

status:

CREATE TABLE `status` (
  `id` int(11) NOT NULL auto_increment,
  `status` varchar(255) default 'new',
  `ppid` int(11) NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8;

触发器:

DROP TRIGGER if EXISTS mytrigger
CREATE TRIGGER mytrigger after INSERT on pp
for EACH ROW
BEGIN
 INSERT into `status`(ppid) values(new.id);
END;

 

核心配置:jdbc-inbound-context.xml

Xml代码  

  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"   
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.        xmlns:context="http://www.springframework.org/schema/context"   
  5.        xmlns:int="http://www.springframework.org/schema/integration"   
  6.        xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      
  7.        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   
  8.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"   
  9.        xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  13.     http://www.springframework.org/schema/integration   
  14.     http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   
  15.     http://www.springframework.org/schema/integration/jdbc   
  16.     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   
  17.     http://www.springframework.org/schema/jdbc   
  18.     http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  
  19.      http://www.springframework.org/schema/integration/jms   
  20.     http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  
  21.     <context:component-scan base-package="com.wisely.inbound"/>  
  22.        
  23.     <int:channel id="target"/>  
  24.       
  25.     <int-jdbc:inbound-channel-adapter channel="target"   
  26.                     data-source="dataSource"  
  27.                     query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  
  28.                     update="update status as st set st.status='old' where ppid in (:ppid)"  
  29.                                        >  
  30.         <!-- 每隔多少毫秒去抓取 -->  
  31.         <int:poller fixed-rate="5000" >  
  32.             <int:transactional/>  
  33.         </int:poller>  
  34.         <!--  指定时刻抓取  
  35.         <int:poller max-messages-per-poll="1">  
  36.             <int:transactional/>  
  37.             <int:cron-trigger expression="0 0 3 * * ?"/>  
  38.         </int:poller>  
  39.         -->  
  40.     </int-jdbc:inbound-channel-adapter>  
  41.     <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>     
  42.      <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>  
  43.      <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">  
  44.         <property name="driverClassName" value="${database.driverClassName}"/>  
  45.         <property name="url" value="${database.url}"/>  
  46.         <property name="username" value="${database.username}"/>  
  47.         <property name="password" value="${database.password}"/>  
  48.     </bean>     
  49.     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  50.         <property name="dataSource" ref="dataSource"/>  
  51.     </bean>      
  52.    </beans>  

 

JdbcMessageHandler:

 

Java代码  

  1. package com.wisely.inbound.jdbc;  
  2.   
  3. import java.util.List;  
  4. import java.util.Map;  
  5.   
  6. import org.springframework.integration.annotation.ServiceActivator;  
  7. import org.springframework.stereotype.Component;  
  8.   
  9. @Component  
  10. public class JdbcMessageHandler {  
  11.     @ServiceActivator  
  12.     public void handleJdbcMessage(List<Map<String ,Object>> message){  
  13.         for(Map<String,Object> resultMap:message){  
  14.             System.out.println("组:");  
  15.             for(String column:resultMap.keySet()){  
  16.                 System.out.println("字段:"+column+" 值:"+resultMap.get(column));  
  17.             }  
  18.         }  
  19.     }  
  20. }  

 

测试类:

Java代码  

  1. package com.wisely.inbound.jdbc;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class JdbcInbound {  
  7.   
  8.     /** 
  9.      * @param args 
  10.      */  
  11.     public static void main(String[] args) {  
  12.           ApplicationContext context =   
  13.                     new ClassPathXmlApplicationContext("/META-INF/spring/jdbc-inbound-context.xml");  
  14.     }  
  15.   
  16. }  

 

 

若将channel改为jms的通道。配置文件做以下修改:

 

Xml代码  

  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"   
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.        xmlns:context="http://www.springframework.org/schema/context"   
  5.        xmlns:int="http://www.springframework.org/schema/integration"   
  6.        xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      
  7.        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   
  8.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"   
  9.        xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  13.     http://www.springframework.org/schema/integration   
  14.     http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   
  15.     http://www.springframework.org/schema/integration/jdbc   
  16.     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   
  17.     http://www.springframework.org/schema/jdbc   
  18.     http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  
  19.      http://www.springframework.org/schema/integration/jms   
  20.     http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  
  21.     <context:component-scan base-package="com.wisely.inbound"/>  
  22.        
  23.     <int-jms:channel id="target"  queue-name="jdbc.queue" connection-factory="connectionFactory"/>  
  24.       
  25.     <int-jdbc:inbound-channel-adapter channel="target"   
  26.                                       data-source="dataSource"  
  27.                                       query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  
  28.                                       update="update status as st set st.status='old' where ppid in (:ppid)"  
  29.                                        >  
  30.         <!-- 每隔多少毫秒去抓取 -->  
  31.         <int:poller fixed-rate="5000" >  
  32.             <int:transactional/>  
  33.         </int:poller>  
  34.         <!--  指定时刻抓取  
  35.         <int:poller max-messages-per-poll="1">  
  36.             <int:transactional/>  
  37.             <int:cron-trigger expression="0 0 3 * * ?"/>  
  38.         </int:poller>  
  39.         -->  
  40.     </int-jdbc:inbound-channel-adapter>  
  41.     <!--   
  42.     <int-jms:message-driven-channel-adapter id="queInbound" destination-name="jmsQueue" channel="target"/> 
  43.     -->  
  44.     <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>  
  45.        
  46.      <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>  
  47.      <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">  
  48.         <property name="driverClassName" value="${database.driverClassName}"/>  
  49.         <property name="url" value="${database.url}"/>  
  50.         <property name="username" value="${database.username}"/>  
  51.         <property name="password" value="${database.password}"/>  
  52.     </bean>  
  53.       
  54.     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  55.         <property name="dataSource" ref="dataSource"/>  
  56.     </bean>  
  57.       
  58.       
  59.     <bean id="activeMqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  60.         <property name="brokerURL" value="vm://localhost" />  
  61.     </bean>  
  62.       
  63.     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
  64.         <property name="sessionCacheSize" value="10" />  
  65.         <property name="cacheProducers" value="false"/>  
  66.         <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>  
  67.     </bean>  
  68.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  69.         <property name="connectionFactory" ref="connectionFactory"/>  
  70.         <property name="defaultDestinationName" value="jmsQueue" />  
  71.     </bean>  
  72. </beans>  

 

时间: 2024-08-29 12:14:19

spring integration同步数据库数据的相关文章

使用SSIS创建同步数据库数据任务

原文:使用SSIS创建同步数据库数据任务 SSIS(SQL Server Integration Services)是用于生成企业级数据集成和数据转换解决方案的平台.使用 Integration Services 可解决复杂的业务问题,具体表现为:复制或下载文件,发送电子邮件以响应事件,更新数据仓库,清除和挖掘数据以及管理 SQL Server 对象和数据.这些包可以独立使用,也可以与其他包一起使用以满足复杂的业务需求.Integration Services 可以提取和转换来自多种源(如 XM

使用SSIS创建同步数据库数据任务的方法_MsSql

这些包可以独立使用,也可以与其他包一起使用以满足复杂的业务需求.Integration Services 可以提取和转换来自多种源(如 XML 数据文件.平面文件和关系数据源)的数据,然后将这些数据加载到一个或多个目标.(摘自MSDN,更多详细信息可参考:http://technet.microsoft.com/zh-cn/library/ms141026(v=sql.105).aspx) 下面我使用SSIS来演示一个实际例子.比如我有一个数据库,出于备份数据或者其它的目的,会定期的对这个数据库

同步远程数据到本地数据库后的删除操作

问题描述 同步远程数据到本地数据库后的删除操作 同步远程数据到本地数据库后,远程数据发生变动,比如删除,那么怎么删除本地的那条数据呢. 情景: 同步淘宝api商品后保持在本地,如果卖家删除了某个商品后,本地怎么去实现同样的操作,删除那条已经在远程端被删除的数据呢! 知道思路的大侠请告知,在此小弟谢过.(j2ee开发) 解决方案 问题的关键是你怎么知道远程的数据被删除了?是淘宝api提供这个功能还是需要定期访问远程来判断商品是否还存在?知道数据被删除,那本地删除应该不是什么问题 解决方案二: ht

springmvc-请教:Spring + Hibernate 无法将数据写入数据库?

问题描述 请教:Spring + Hibernate 无法将数据写入数据库? Spring + Hibernate 无法将数据写入数据 请教: 通过Junit单元测试Service可以将数据写入数据库:但部署访问却无法向数据库写入数据. 1 环境: Spring 3.1.2 Hibernate 4.1.4 Jdk1.6 2 配置: 2.1 Web.xml <!-- Spring ApplicationContext配置文件的路径,可使用通配符,多个路径用,号分隔 此参数用于后面的Spring C

mysql-MySql 与其他数据库 数据同步

问题描述 MySql 与其他数据库 数据同步 各位大哥请教一下:问一下,MySQL怎样同步其他数据数据库(Oracle||sqlserver)的数据呢?比如dblink之类,谢谢!

c#如何修改和删除DGV中的数据,不需要同步数据库

问题描述 c#如何修改和删除DGV中的数据,不需要同步数据库 只需要简单的在DGV上显示出修改后的东西,删除可能比较简单,就是修改应该怎么做 当然了 我希望能有一段参考的代码 或者一个参考的链接 重新描述一下 我的想法是在界面上设置几个TEXTBOX然后写文本,然后再将这些文本信息放到窗口界面的DGV对应选中的行当中 当然其中还要对比列的名称才能修改对应的列 比如说我写完了要修改的textbox文本 然后点击一个按钮(如保存)在DGV上就能显示出修改后的信息了 这个东西不需要经过数据库 只要在D

使用E-MapReduce集群sqoop组件同步云外Oracle数据库数据到集群hive

E-MapReduce集群sqoop组件可以同步数据库的数据到集群里,不同的数据库源网络配置有一些差异网络配置.最常用的场景是从rds mysql同步数据,最近也有用户询问如何同步云外专有Oracle数据库数据到hive.云外专有数据库需要集群所有节点通过公网访问,要创建VPC网络,使用VPC网络创建集群,给集群各节点绑定动态ip,检查网络链路,Oracle数据库还上传oracle jdbc jar.本文会详细介绍具体的操作步骤. 创建vpc专有网络集群 如果没创建过VPC专有网络和子网交换机,

spring data jpa 第一行代码我save了某一个表的数据,看到数据库数据变了。

问题描述 spring data jpa 第一行代码我save了某一个表的数据,看到数据库数据变了. spring data jpa 第一行代码我save了某一个表的数据,看到数据库数据变了. 我第二行去查询这个表的数据,得到的却是没保存数据之前的数据. 再刷新页面就得到了现在数据库的数据是咋回事呢 解决方案 可能是事务的原因,,这次的事务还没提交,, 解决方案二: 需要commit呀,不然是数据是查询不出来的

实现两台MySQL数据库数据的同步的方法_Linux

1. 安装配置  两台服务器,分别安装好Mysql,都安装在 /usr/local/mysql 目录下(安装步骤省略,请参考相关文档),两台服务器的IP分别是192.168.0.1和192.168.0.2,我们把192.168.0.1作为master数据库,把192.168.0.2作为slave服务器,我们采用单向同步的方式,就是master的数据是主的数据,然后slave主动去master哪儿同步数据回来.  两台服务器的配置一样,我们把关键的配置文件拷贝一下,默认的配置文件是在 /usr/l