本節我們在上一節基礎上進一步完成TCP協定的收發機制。上一節我們已經實作了向伺服器方發送一個字元,本節我們要實作連續發送多個字元,并且能正常接收資料功能,完成了這些功能後,我們就可以基于此去開發其他建構在TCP之上的其他協定。
為了保證資料能正确的連續收發,本節的設計思路是使用一個隊列将發送的資料存儲起來,然後将資料包發送,隻有等待收到對方回發的ack後,我們才将資料從隊列中删除,如果資料包一直沒有收到ack回應,我們就啟動一個timer,自動将隊列中的資料包進行發送,如果發送給定次數後還沒有成功,那麼就通知資料發送層發送失敗,接下來我們看看相應代碼設計。
class SendPacketWrapper {
//将發送的資料包封裝起來存儲在隊列中
private byte[] packet_to_send;
private int seq_num;
private int ack_num;
private int send_count = 0;
public SendPacketWrapper(byte[] packet, int seq_num) {
this.packet_to_send = packet;
this.seq_num = seq_num;
this.ack_num = seq_num + packet.length;
}
public byte[] get_packet() {
return this.packet_to_send;
}
public int get_seq_num() {
return this.seq_num;
}
public int get_ack_num() {
return this.ack_num;
}
public void increase_send_count() {
this.send_count++;
}
public int get_send_count() {
return this.send_count;
}
}
class SendPacketTask extends TimerTask {
private TCPThreeHandShakes task_handler;
public SendPacketTask(TCPThreeHandShakes handler) {
this.task_handler = handler;
}
@Override
public void run() {
this.task_handler.sendPacketInList();
}
}
第一個類用于負責把發送的資料封裝起來,他記錄了資料的緩沖區,以及發送時對應的seq号,這樣當資料包需要重發時就可以再次使用這個數值進行發送,同時也記錄了應對的ack号,這樣當對方傳回ack值時,我們才能檢驗該資料包是否已經被對方接收。
接下來我們在類TCPThreeHandShakes中添加相應變量和代碼:
public class TCPThreeHandShakes extends Application{
。。。。
private int tcp_state = CONNECTION_IDLE;
private static int PACKET_SEND_TIMES = 3; //連續發生3次不成功則失敗
private Timer send_timer = new Timer(); //定時将發送隊列中的資料包進行發送
private int packet_resend_time = 2000; //每過一秒就發送隊列中存儲的資料包
private SendPacketTask resend_packet_task = null;
//每次發送資料包時先将它存儲在隊列中,發送出去收到ack後再将它從隊列中去除
private ArrayList<SendPacketWrapper> send_packet_list = new ArrayList<SendPacketWrapper>();
public TCPThreeHandShakes(byte[] server_ip, short server_port, ITCPHandler tcp_handler) {
this.dest_ip = server_ip;
this.dest_port = server_port;
//指定一個固定端口,以便抓包調試
Random rand = new Random();
this.port = (short)rand.nextInt();
this.tcp_handler = tcp_handler;
resend_packet_task = new SendPacketTask(this);
send_timer.scheduleAtFixedRate(resend_packet_task, packet_resend_time, packet_resend_time);
}
。。。。
}
我們添加了一系列與資料包發送和檢驗變量和代碼,特别是啟動一個timer,在每兩秒就去檢測資料包隊列,如果裡面還有資料包沒有接收到對應的ack,也就是上次發送時對方沒有成功接收,那麼timer就會将資料包再次發送,如果已經發送超過給定次數,timer就會通知上層應用資料發送失敗。
接下來我們要添加把資料包在發送時存儲到隊列和檢驗隊列資料包的代碼:
private void savePacketToList(byte[] packet) {
//如果資料包沒有存在隊列中就加入隊列
boolean contains = false;
for(int i = 0; i < send_packet_list.size(); i++) {
SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
if (packet_wrapper.get_packet() == packet) {
contains = true;
break;
}
}
if (contains == false) {
SendPacketWrapper packet_wrapper = new SendPacketWrapper(packet, this.seq_num);
this.send_packet_list.add(packet_wrapper);
}
}
public void sendPacketInList() {
ArrayList<SendPacketWrapper> wrapper_list = new ArrayList<SendPacketWrapper>();
//将所有在隊列中的資料包系數發送,如果資料包發送次數大于給定次數則報告失敗
for(int i = 0; i < this.send_packet_list.size(); i++) {
SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
if (packet_wrapper.get_send_count() >= PACKET_SEND_TIMES) {
this.tcp_handler.send_notify(false, packet_wrapper.get_packet());
}
else {
int old_seq_num = this.seq_num;
this.seq_num = packet_wrapper.get_seq_num();
try {
createAndSendPacket(packet_wrapper.get_packet(), "ACK");
} catch (Exception e) {
e.printStackTrace();
}
this.seq_num = old_seq_num;
wrapper_list.add(packet_wrapper);
}
}
this.send_packet_list = wrapper_list;
}
private void checkSendPacketByACK(int recv_ack) {
ArrayList<SendPacketWrapper> wrapper_list = new ArrayList<SendPacketWrapper>();
//所有ack值小于傳回ack的資料包都已經成功發送,此時要将資料包從隊列移除并通知上層
for(int i = 0; i < this.send_packet_list.size(); i++) {
SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
int ack = packet_wrapper.get_ack_num();
System.out.println("receive ack: " + ack);
if (packet_wrapper.get_ack_num() <= recv_ack) {
this.seq_num = packet_wrapper.get_ack_num();
System.out.println("next seq num: "+ this.seq_num);
this.tcp_handler.send_notify(true, packet_wrapper.get_packet());
}
else {
wrapper_list.add(packet_wrapper);
}
}
this.send_packet_list = wrapper_list;
this.seq_num = recv_ack;
}
在這三個函數中,第一個負責資料包第一次發送時将其存儲在隊列,第二個負責輪訓隊列,将上次沒有發送成功的資料包繼續發送,如果發送超過給定次數則向上層應用報告發送失敗,最後一個函數是在收到對方發來的ack包後檢驗隊列中哪些資料包發送成功,檢驗标準是所有ack小于對方發來ack數值的資料包都表明成功發送,接下來在handleData函數,也就是接收對方發來資料包的函數裡我們增加如下流程:
@Override
public void handleData(HashMap<String, Object> headerInfo) {
。。。。
if (tcp_state == CONNECTION_SEND || tcp_state == CONNECTION_CONNECTED) {
tcp_state = CONNECTION_CONNECTED;
checkSendPacketByACK(ack_num);
if (data != null && data.length > 0 && seq_num == this.ack_num) {
/*
* 這裡我們簡化資料的接收流程,為了提升資料發送效率,很有可能資料包的到來次序與伺服器發送時不一樣
* ,但為了讓實作邏輯簡單,我們每次隻接收指定資料包,例如目前我們等待seq編号為1,2,3的資料包,結果
* 資料包抵達的次序為3,1,2,那麼我們就隻接收資料包1,讓對方再次發送資料包2,3,顯然這樣子會降低效率,
* 但為了實作邏輯簡單,我們暫時做妥協
*/
this.tcp_handler.recv_notify(data);
createAndSendPacket(null, "ACK");
}
}
。。。。、
}
新添加這段代碼的作用是當對方資料包到來時,我們先抽取出包中的ack值,使用該值去檢驗隊列中哪些資料包已經成功發送,同時如果對方發來的資料包中有資料的話,我們就把資料取出,然後送出給上層應用,最後我們看看上層如何使用該tcp連接配接層來實作資料發送:
package Application;
import java.net.InetAddress;
import utils.ITCPHandler;
public class TCPRawDataSender implements ITCPHandler{
private TCPThreeHandShakes tcp_socket = null;
private String[] buffer = new String[] {"h", "e", "l", "l", "o"};
private int buffer_p = 0;
private byte[] current_send_packet = null;
private void send_content() throws Exception {
if (buffer_p < buffer.length) {
System.out.println("send content: " + buffer[buffer_p]);
byte[] send_content = buffer[buffer_p].getBytes();
current_send_packet = send_content;
tcp_socket.tcp_send(send_content);
}
}
@Override
public void connect_notify(boolean connect_res) {
if (connect_res) {
System.out.println("connection established!");
try {
send_content();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
else {
System.out.println("connection fail!");
}
}
private void close_connection() {
try {
tcp_socket.tcp_close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void send_notify(boolean send_res, byte[] packet_send) {
if (send_res == true) {
System.out.println("send data, buffer_p: " + buffer_p);
if (packet_send == current_send_packet) {
buffer_p++;
current_send_packet = null;
}
}
if (buffer_p >= buffer.length || send_res == false) {
String info = "send all data ";
if (send_res == false) {
info = "send fail with buffer_p: " + buffer_p;
}
System.out.println(info);
}
else {
try {
send_content();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
public void connect_close_notify(boolean close_res) {
if (close_res == true) {
System.out.println("connection close complete!");
} else {
System.out.println("connection close fail!");
}
}
public void run() {
try {
InetAddress ip = InetAddress.getByName("192.168.2.127"); //220.181.43.8
short port = 1234;
tcp_socket = new TCPThreeHandShakes(ip.getAddress(), port, this);
tcp_socket.tcp_connect();
System.out.println("finish handshake!");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void recv_notify(byte[] packet_recv) {
System.out.println("receive data: " + new String(packet_recv));
close_connection();
}
}
該類繼承了ITCPHandler接口以便接收資料發送過程的各種調用,其中buffer中存儲的是要發送給對方的資料,當connect_notify被調用時,如果連接配接成功,他就會使用send_content函數發送緩沖區裡的一個字元,如果發送成功,它的send_notify會調用,在該函數裡,他檢驗成功發送的資料是不是自己目前正在發生的資料,如果是它就将緩沖器指針挪動一位發送下一個字元,當所有資料發送完畢後,它會等待對方向它發送資料,一旦成功接收對方發來的資料,它的recv_notify函數會被調用,此時它把對方發送來的資料顯示出來後,調用close_connection關閉連接配接
我在iphone上安裝了一款名為tcp server的免費app做實驗,我是上面代碼與該app建立的tcp server伺服器連接配接,然後将資料發送給他,并接收從它發過來的資料,最後運作結果如下圖:
更詳細的講解和代碼調試示範過程,請點選連結
更多技術資訊,包括作業系統,編譯器,面試算法,機器學習,人工智能,請關照我的公衆号: