天天看點

spring integration同步資料庫資料

解決:因為客戶的業務表是不能變動的,我方在客戶資料庫中建立一狀态表,記錄哪些資料被更新過。

當客戶業務表有新資料插入時,用觸發器将新資料id插入到狀态表。

為友善執行個體:業務表pp,狀态表status

結構為:

pp:

create table `pp` (

  `name` varchar(255) default null,

  `address` varchar(255) default null,

  `id` int(11) not null auto_increment,

  primary key  (`id`)

) engine=innodb auto_increment=9 default charset=utf8;

status:

create table `status` (

  `status` varchar(255) default 'new',

  `ppid` int(11) not null,

) engine=innodb auto_increment=12 default charset=utf8;

觸發器:

drop trigger if exists mytrigger

create trigger mytrigger after insert on pp

for each row

begin

 insert into `status`(ppid) values(new.id);

end;

核心配置:jdbc-inbound-context.xml

spring integration同步資料庫資料

<?xml version="1.0" encoding="utf-8"?>  

<beans xmlns="http://www.springframework.org/schema/beans"   

       xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"   

       xmlns:context="http://www.springframework.org/schema/context"   

       xmlns:int="http://www.springframework.org/schema/integration"   

       xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      

       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   

       xmlns:jdbc="http://www.springframework.org/schema/jdbc"   

       xsi:schemalocation="http://www.springframework.org/schema/beans   

    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   

    http://www.springframework.org/schema/context   

    http://www.springframework.org/schema/context/spring-context-3.0.xsd   

    http://www.springframework.org/schema/integration   

    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   

    http://www.springframework.org/schema/integration/jdbc   

    http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   

    http://www.springframework.org/schema/jdbc   

    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  

     http://www.springframework.org/schema/integration/jms   

    http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  

    <context:component-scan base-package="com.wisely.inbound"/>  

    <int:channel id="target"/>  

    <int-jdbc:inbound-channel-adapter channel="target"   

                    data-source="datasource"  

                    query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  

                    update="update status as st set st.status='old' where ppid in (:ppid)"  

                                       >  

        <!-- 每隔多少毫秒去抓取 -->  

        <int:poller fixed-rate="5000" >  

            <int:transactional/>  

        </int:poller>  

        <!--  指定時刻抓取  

        <int:poller max-messages-per-poll="1">  

            <int:cron-trigger expression="0 0 3 * * ?"/>  

        -->  

    </int-jdbc:inbound-channel-adapter>  

    <int:service-activator input-channel="target" ref="jdbcmessagehandler"/>     

     <context:property-placeholder location="classpath*:meta-inf/spring/*.properties"/>  

     <bean class="org.apache.commons.dbcp.basicdatasource" destroy-method="close" id="datasource">  

        <property name="driverclassname" value="${database.driverclassname}"/>  

        <property name="url" value="${database.url}"/>  

        <property name="username" value="${database.username}"/>  

        <property name="password" value="${database.password}"/>  

    </bean>     

    <bean id="transactionmanager" class="org.springframework.jdbc.datasource.datasourcetransactionmanager">  

        <property name="datasource" ref="datasource"/>  

    </bean>      

   </beans>  

jdbcmessagehandler:

spring integration同步資料庫資料

package com.wisely.inbound.jdbc;  

import java.util.list;  

import java.util.map;  

import org.springframework.integration.annotation.serviceactivator;  

import org.springframework.stereotype.component;  

@component  

public class jdbcmessagehandler {  

    @serviceactivator  

    public void handlejdbcmessage(list<map<string ,object>> message){  

        for(map<string,object> resultmap:message){  

            system.out.println("組:");  

            for(string column:resultmap.keyset()){  

                system.out.println("字段:"+column+" 值:"+resultmap.get(column));  

            }  

        }  

    }  

}  

測試類:

spring integration同步資料庫資料

import org.springframework.context.applicationcontext;  

import org.springframework.context.support.classpathxmlapplicationcontext;  

public class jdbcinbound {  

    /** 

     * @param args 

     */  

    public static void main(string[] args) {  

          applicationcontext context =   

                    new classpathxmlapplicationcontext("/meta-inf/spring/jdbc-inbound-context.xml");  

若将channel改為jms的通道。配置檔案做以下修改:

spring integration同步資料庫資料

    <int-jms:channel id="target"  queue-name="jdbc.queue" connection-factory="connectionfactory"/>  

                                      data-source="datasource"  

                                      query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  

                                      update="update status as st set st.status='old' where ppid in (:ppid)"  

    <!--   

    <int-jms:message-driven-channel-adapter id="queinbound" destination-name="jmsqueue" channel="target"/> 

    -->  

    <int:service-activator input-channel="target" ref="jdbcmessagehandler"/>  

    </bean>  

    <bean id="activemqconnectionfactory" class="org.apache.activemq.spring.activemqconnectionfactory">  

        <property name="brokerurl" value="vm://localhost" />  

    <bean id="connectionfactory" class="org.springframework.jms.connection.cachingconnectionfactory">  

        <property name="sessioncachesize" value="10" />  

        <property name="cacheproducers" value="false"/>  

        <property name="targetconnectionfactory" ref="activemqconnectionfactory"/>  

    <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate">  

        <property name="connectionfactory" ref="connectionfactory"/>  

        <property name="defaultdestinationname" value="jmsqueue" />  

</beans>