天天看點

James之——添加自定義服務

Why?

    通過POP3可以擷取到james中指定使用者收取到的所有郵件,但沒有提供擷取郵件發送狀态、最近聯系人功能。鑒于兩者在服務端功能上相似,此文取簡單的做一下介紹:擷取最近聯系人功能。

    當然,可以在這上面進行更多的拓展,擷取郵件的發送狀态便是一個不錯的功能,以後有時間再詳細介紹下。

What?

    可以站在巨人的肩膀上快速開發這個功能。其實james已經封裝好了一套“流程”,添加幾個類、接口、配置就可以了,如下圖所示:

James之——添加自定義服務

How?

    内容主要分為兩個部分:添加服務邏輯和配置。

1、服務邏輯

1.1 RecentContactsServer:

/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/

package org.apache.james.recentcontacts;

import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
import org.apache.james.core.AbstractJamesService;

public class RecentContactsServer extends AbstractJamesService implements RecentContactsHandlerMBean{

    /***
    * 擷取服務處理對象
    */
    @Override
    protected ConnectionHandler newHandler() throws Exception {
        return new RecentContactsHandler();
    }

    /**
     * 服務名,在啟動時顯示
     */
    @Override
    public String getServiceType() {
        return "SearchMail Service";
    }
}
           

1.2 RecentContactsHandlerMBean:

/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/

package org.apache.james.recentcontacts;

/**
 * An interface to expose James management functionality through JMX.
 * 
 * @phoenix:mx-topic name="RecentContactsServer"
 */
public interface RecentContactsHandlerMBean {

    /**
    * @phoenix:mx-attribute
    * @phoenix:mx-description Returns flag indicating it this service is enabled 
    * @phoenix:mx-isWriteable no
    * 
    * @return boolean The enabled flag     
    */  
    public boolean isEnabled();

    /**
    * @phoenix:mx-attribute
    * @phoenix:mx-description Returns the port that the service is bound to 
    * @phoenix:mx-isWriteable no
    * 
    * @return int The port number     
    */  
    public int  getPort();
    
    /**
    * @phoenix:mx-attribute
    * @phoenix:mx-description Returns the address if the network interface the socket is bound to 
    * @phoenix:mx-isWriteable no
    * 
    * @return String The network interface name     
    */  
    public String  getNetworkInterface();
    
    /**
    * @phoenix:mx-attribute
    * @phoenix:mx-description Returns the server socket type, plain or SSL 
    * @phoenix:mx-isWriteable no
    * 
    * @return String The scoekt type, plain or SSL     
    */  
    public String  getSocketType();
}
           

1.3 RecentContactsHandler:

/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/

package org.apache.james.recentcontacts;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ProtocolException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Locale;
import java.util.StringTokenizer;

import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
import org.apache.avalon.excalibur.pool.Poolable;
import org.apache.avalon.framework.logger.AbstractLogEnabled;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.james.transport.JdbcUtils;
import org.apache.james.util.CRLFTerminatedReader;
import org.apache.james.util.InternetPrintWriter;

/**
 * 擷取最近聯系人
 * @author sqlu
 */
public class RecentContactsHandler extends AbstractLogEnabled implements ConnectionHandler, Poolable{
    private Log logger = LogFactory.getLog(getClass());

    /** 查詢信件指令 */
    private final static String COMMAND_RECENT_CONTACTS = "RECENT_CONTACTS";
    
    /** 查詢指令(參數)分隔符  */
    private final static String PARAM_SPLIT = " ";
    
    // response prefixes
    private final static String OK_RESPONSE = "+OK";    // OK response.  Requested content
                                                        // will follow

    private final static String ERR_RESPONSE = "-ERR";  // Error response.  Requested content
                                                        // will not be provided.  This prefix
                                                        // is followed by a more detailed
                                                        // error message
    
    /**
     * The TCP/IP socket over which the xxx interaction
     * is occurring
     */
    private Socket socket;
    /**
     * The reader associated with incoming characters.
     */
    private CRLFTerminatedReader in;

    /**
     * The writer to which outgoing messages are written.
     */
    private PrintWriter out;
    /**
     * The socket's output stream
     */
    private OutputStream outs;
    
    /***
     * 處理請求
     */
    @Override
    public void handleConnection(Socket connection) throws IOException, ProtocolException {
        logger.debug(this.getClass().getName() + "接收到請求");
        String remoteIP = "";

        try {
            this.socket = connection;
            in = new CRLFTerminatedReader(new BufferedInputStream(socket.getInputStream(), 512), "ASCII");
            remoteIP = socket.getInetAddress().getHostAddress ();
        } catch (Exception e) {
            logger.error(String.format("Cannot open connection from (%s) : %s", remoteIP, e.getMessage()), e);
        }

        try {
            outs = new BufferedOutputStream(socket.getOutputStream(), 1024);
            out = new InternetPrintWriter(outs, true);
            String line = readCommandLine();
            logger.debug("請求資料:" + line);
            
            //隻處理請求一次
            parseCommand(line);
            
        } catch (Exception e) {
            logger.error(String.format("Exception during connection from (%s) : %s", remoteIP, e.getMessage()), e);
            
            if(e instanceof SocketTimeoutException){
                writeLoggedFlushedResponse(false, "socket timed out.");
            } else {
                writeLoggedFlushedResponse(false, "socket exception.");
            }
        } finally {
            resetHandler();
        }
    }
    
    /**
     * 根據command指令進行處理
     *
     * @param rawCommand the raw command string passed in over the socket
     *
     * @return whether additional commands are expected.
     */
    private boolean parseCommand(String rawCommand){
        
        if (rawCommand == null) {
            logger.error("指令為null");
            return false;
        }
        logger.debug("rawCommand:" + rawCommand);
        
        boolean returnValue = true;
        StringTokenizer commandLine = new StringTokenizer(rawCommand, PARAM_SPLIT);//以空格為指令(參數)的分隔符
        int argumentCnt = commandLine.countTokens();

        //請求指令格式:
        //RECENT_CONTACTS[空格]賬号
        if(argumentCnt != 2){
            writeLoggedFlushedResponse(false, "command error");
            return false;
        } 
        
        String command = commandLine.nextToken().toUpperCase(Locale.US);
        String username = commandLine.nextToken();
        
        if(StringUtils.isBlank(command) || StringUtils.isBlank(username)){
            writeLoggedFlushedResponse(false, "command error");
            return false;
        }
        
        if(COMMAND_RECENT_CONTACTS.equals(command)){
            doSearchMailCmd(command,username);
        } else {
            doUnknownCmd(command,username);
        }
        
        return returnValue;
    }
    
    /**
     * 擷取最近聯系人
     * @param command
     * @param username 使用者名
     * @author sqlu
     */
    private void doSearchMailCmd(String command, String username) {
        try{
            JdbcUtils utils = new JdbcUtils();
            String mailStatu = utils.getRecentContacts(username);
            writeLoggedFlushedResponse(true, mailStatu);
        } catch (Exception e) {
            logger.error(e);
        }
    }
    
    
    /**
     * Handler method called upon receipt of an unrecognized command.
     * Returns an error response and logs the command.
     *
     * @param command the command parsed by the parseCommand method
     * @param argument the first argument parsed by the parseCommand method
     * @param argument1 the second argument parsed by the parseCommand method
     */
    private void doUnknownCmd(String command, String argument){
        writeLoggedFlushedResponse(false, "unknow command");
    }
    
    /**
     * Resets the handler data to a basic state.
     */
    private void resetHandler() {

        try {
            if (socket != null) {
                socket.close();
                socket = null;
            }
        } catch (IOException ioe) {
            // Ignoring exception on close
        } finally {
            socket = null;
        }

        try {
            if (in != null) {
                in.close();
            }
        } catch (Exception e) {
            // Ignored
        } finally {
            in = null;
        }

        try {
            if (out != null) {
                out.close();
            }
        } catch (Exception e) {
            // Ignored
        } finally {
            out = null;
        }

        try {
           if (outs != null) {
               outs.close();
            }
        } catch (Exception e) {
            // Ignored
        } finally {
            outs = null;
        }

    }

    
    /**
     * Reads a line of characters off the command line.
     *
     * @return the trimmed input line
     * @throws IOException if an exception is generated reading in the input characters
     */
    final String readCommandLine() throws IOException {
        for (;;) try {
            //使用原始CRLFTerminatedReader擷取請求資料時,值為null。此處需要特殊處理
//            String commandLine = in.readLine();
            String commandLine = new BufferedReader(in).readLine();
            
            if (commandLine != null) {
                commandLine = commandLine.trim();
            }
            
            logger.debug("commandLine:" + commandLine);
            return commandLine;
        } catch (CRLFTerminatedReader.TerminationException te) {
            logger.error(" Syntax error at character position " + te.position() + ". CR and LF must be CRLF paired.  See RFC 1939 #3.", te);
            writeLoggedFlushedResponse(false, "param line unexpected");
        }
    }
    
    /**
     * 擷取傳回内容
     * @param isSuccess
     * @param msg
     * @return
     * @author sqlu
     */
    private String getResponseStr(boolean isSuccess, String msg){
        return (isSuccess ? OK_RESPONSE : ERR_RESPONSE) + PARAM_SPLIT + msg;
    }
    
    /**
     * 寫日志 并 傳回指定内容
     * @param isSuccess
     * @param msg
     * @author sqlu
     */
    final void writeLoggedFlushedResponse(boolean isSuccess, String msg){
        if(msg == null){
            throw new RuntimeException("writeLoggedFlushedResponse 方法的msg傳參不允許為空");
        }
        
        String responseString = getResponseStr(isSuccess, msg);
        
        out.println(responseString);
        out.flush();
        
        logger.debug("回複内容:" + responseString);
        logResponseString(responseString);
    }
    
    /**
     * This method logs at a "DEBUG" level the response string that
     * was sent to the POP3 client.  The method is provided largely
     * as syntactic sugar to neaten up the code base.  It is declared
     * private and final to encourage compiler inlining.
     *
     * @param responseString the response string sent to the client
     */
    private final void logResponseString(String responseString) {
        logger.debug("Sent: " + responseString);
    }
    
}
           

1.4 pachage.html:

<!-- 
 Licensed to the Apache Software Foundation (ASF) under one   
 or more contributor license agreements.  See the NOTICE file 
 distributed with this work for additional information        
 regarding copyright ownership.  The ASF licenses this file   
 to you under the Apache License, Version 2.0 (the            
 "License"); you may not use this file except in compliance   
 with the License.  You may obtain a copy of the License at   
                                                               
   http://www.apache.org/licenses/LICENSE-2.0                 
                                                               
  Unless required by applicable law or agreed to in writing,   
 software distributed under the License is distributed on an  
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       
 KIND, either express or implied.  See the License for the    
 specific language governing permissions and limitations      
 under the License.  
-->
<body>
<p>Provides classes implementing RecentContacts functionality.</p>
</body>
           
<?xml version="1.0"?>
<!-- 
 Licensed to the Apache Software Foundation (ASF) under one   
 or more contributor license agreements.  See the NOTICE file 
 distributed with this work for additional information        
 regarding copyright ownership.  The ASF licenses this file   
 to you under the Apache License, Version 2.0 (the            
 "License"); you may not use this file except in compliance   
 with the License.  You may obtain a copy of the License at   
                                                               
   http://www.apache.org/licenses/LICENSE-2.0                 
                                                               
  Unless required by applicable law or agreed to in writing,   
 software distributed under the License is distributed on an  
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       
 KIND, either express or implied.  See the License for the    
 specific language governing permissions and limitations      
 under the License.  
-->
<blockinfo>

  <!-- section to describe block -->
  <block>
    <version>1.0</version>
  </block>

  <!-- interfaces that may be exported to manange this block -->
  <management-access-points>
    <service name="org.apache.james.recentcontacts.RecentContactsHandlerMBean"/>
  </management-access-points>

  <dependencies>
    <dependency>
      <service name="org.apache.james.services.JamesConnectionManager" version="1.0"/>
    </dependency>
    <dependency>
      <service name="org.apache.avalon.cornerstone.services.sockets.SocketManager" version="1.0"/>
    </dependency>
    <dependency>
      <service name="org.apache.avalon.cornerstone.services.threads.ThreadManager" version="1.0"/>
    </dependency>
  </dependencies>

</blockinfo>
           

1.5 RecentContactsServer.xinfo:

<?xml version="1.0"?>
<!-- 
 Licensed to the Apache Software Foundation (ASF) under one   
 or more contributor license agreements.  See the NOTICE file 
 distributed with this work for additional information        
 regarding copyright ownership.  The ASF licenses this file   
 to you under the Apache License, Version 2.0 (the            
 "License"); you may not use this file except in compliance   
 with the License.  You may obtain a copy of the License at   
                                                               
   http://www.apache.org/licenses/LICENSE-2.0                 
                                                               
  Unless required by applicable law or agreed to in writing,   
 software distributed under the License is distributed on an  
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       
 KIND, either express or implied.  See the License for the    
 specific language governing permissions and limitations      
 under the License.  
-->
<blockinfo>

  <!-- section to describe block -->
  <block>
    <version>1.0</version>
  </block>

  <!-- interfaces that may be exported to manange this block -->
  <management-access-points>
    <service name="org.apache.james.recentcontacts.RecentContactsHandlerMBean"/>
  </management-access-points>

  <dependencies>
    <dependency>
      <service name="org.apache.james.services.JamesConnectionManager" version="1.0"/>
    </dependency>
    <dependency>
      <service name="org.apache.avalon.cornerstone.services.sockets.SocketManager" version="1.0"/>
    </dependency>
    <dependency>
      <service name="org.apache.avalon.cornerstone.services.threads.ThreadManager" version="1.0"/>
    </dependency>
  </dependencies>

</blockinfo>
           

1.6 JdbcUtils

public class JdbcUtils {
    private Log logger = LogFactory.getLog(getClass());
    
    DruidPooledConnection getConn(){
      //請自行實作資料庫連接配接擷取
    }
    
    /**
     * 擷取最近聯系人
     * @param username
     * @return
     * @author sqlu
     */
    public String getRecentContacts(String username){
        ResultSet rs = null;
        List<String> recentContacts = new LinkedList<String>();
        
        //擷取最近3個聯系人
        String sql = String.format("select `sender` from `inbox` where `repository_name` = '%s' order by `last_updated` desc limit 3", StringEscapeUtils.escapeSql(username));
        DruidPooledConnection conn = getConn();//擷取資料庫連接配接,此處需要自行定義
        try {
            PreparedStatement pst = conn.prepareStatement(sql);
            rs = pst.executeQuery(sql);
            while (rs.next()) {
                recentContacts.add(rs.getString("sender"));
            }
        } catch (Exception e) {
            logger.error("查詢資料失敗", e);
        } finally {
            closeConn(conn);
            if(rs != null){
                try {
                    rs.close();
                } catch (SQLException e) {
                    logger.error(e);
                }
            }
        }
        
        return JSON.toJSONString(recentContacts);
    }
    
}
           

2、配置

注意:如果james已經解壓了,需要修改解壓目錄下的配置檔案,路徑為:解壓目錄/apps/james/SAR-INF/

2.1 修改config.xml(項目中路徑為:src/config/james-config.xml):

James之——添加自定義服務

2.2 修改assembly.xml(項目中位置為:src/config/james-assembly.xml):

James之——添加自定義服務

3、建構并啟動

    使用ant建構,啟動james後,如果出現下圖中選中的部分,則表示 最近聯系人服務已經啟動成功:

James之——添加自定義服務

4、測試

4.1 需要使用socket對服務進行請求,demo為:

package com.sqlu.test.socket;

import java.io.InputStream;
import java.io.PrintStream;
import java.net.Socket;

/**
 * @author lusq
 */
public class SocketClient {
	public static void main(String[] args) throws Exception {
		
		String message = "RECENT_CONTACTS sqlu.develop.com";
		
		String host = "localhost";//服務位址
		int port = 1314;//服務監聽的端口
		Socket client = new Socket(host, port);
		
		PrintStream outputStream = new PrintStream(client.getOutputStream());
		System.out.println("sending message:" + message);
		outputStream.println(message);
		outputStream.flush();
		
		//通過shutdownOutput通知伺服器已經發送完資料,後續隻能接受資料
		client.shutdownOutput();
		
		InputStream inputStream = client.getInputStream();
		
		String msg = DataUtils.getStringFromInp(inputStream, SocketConstants.SOCKET_ENCODING, SocketConstants.SOCKER_BUFFER_SIZE);
		System.out.println("receive back:" + msg);
		
		inputStream.close();
		outputStream.close();
		client.close();
		
		System.out.println("client end.....");
	}
}
           

4.2 資料庫中檢視,應傳回的聯系人資料為:

James之——添加自定義服務

4.3 實際傳回的内容為:

James之——添加自定義服務

    資料正确,功能已實作。^_^