天天看點

flink實作httpServer功能,請求source,sink進行處理傳回

作者:94版三國演義

需求

在java中httpServer是非常普遍的一個功能,前端送出請求,後端根據url 以及參數做出相應的處理,傳回資料,有get請求 post請求。

現在我需要flink實作這樣的功能,http請求flink的source,sink進行處理并傳回結果

最終實作結果截圖如下

flink實作httpServer功能,請求source,sink進行處理傳回
flink實作httpServer功能,請求source,sink進行處理傳回

實作代碼如下

Job代碼

package Job;
import flink.HttpSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Job {
    public static Logger logger = LoggerFactory.getLogger(Job.class);
    public static void main(String[] args) throws Exception {
        //1、擷取流處理的環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource source = env.addSource(new HttpSource());
        source.addSink(new Sink());
        env.execute("http計算");
    }
}           

HttpSource代碼

監聽端口資料

package flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class HttpSource extends RichParallelSourceFunction<String> {
    public static ServerSocket serverSocket = null;
    public static Socket socket = null;
    public static BufferedReader bufferedReader =null;
    public static Logger logger = LoggerFactory.getLogger(HttpSource.class);
    @Override
    public void open(Configuration parameters) throws Exception {
        int port = 8200;
        try {
           //1、開啟端口
            serverSocket = new ServerSocket(port);
        } catch (IOException e) {
            logger.error("open port error "+e.getMessage());
        }
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (true) {
            try {
                logger.info("serverSocket accept ");
                socket = serverSocket.accept();
                logger.info("serverSocket bufferedReader ");
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//擷取輸入流(請求)
                StringBuilder stringBuilder = new StringBuilder();
                String line = null;
                Thread.sleep(500);
                //得到請求的内容,注意這裡作兩個判斷非空和""都要,隻判斷null會有問題
                logger.info("serverSocket readLine ");
                while ((line = bufferedReader.readLine()) != null && !line.equals("")) {
                    stringBuilder.append(line);
                }
                //過濾掉某些請求頭之類的
                String result = stringBuilder.toString().split(" ")[1];
                ctx.collect(result);
            } catch (Exception e) {
                logger.error("HttpSource "+e.getMessage());
                continue;
            }finally {
                if(bufferedReader!=null){
                    bufferedReader.close();
                }
            }
        }
    }

    @Override
    public void cancel() {

    }
}           

注意一個地方

flink實作httpServer功能,請求source,sink進行處理傳回

正常情況下,前端進行請求,會傳回請求方式 請求路徑以及請求參數 後面會有浏覽器整個資訊,非常多,我這裡做了處理,隻擷取請求路徑以及請求參數

Sink類

package Job;

import Server.httpRequest;
import Server.httpResponse;
import com.alibaba.fastjson.JSONObject;
import flink.HttpSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Sink extends RichSinkFunction<String> {
    public static Logger logger=LoggerFactory.getLogger(Sink.class);
    public void open(Configuration parameters){

    }
    @Override
    public void invoke(String result, Context context){
        //1、處理資料,過濾幹擾因素 /hello?name=dde
        JSONObject jsonObject = new httpRequest().analysisContent(result);
        //2、擷取url
        if(jsonObject.containsKey("url")){
            //3、根據不同的url做出不同的處理
            if(jsonObject.getString("url").equals("/sub")){
                if(jsonObject.containsKey("params")){
                    System.out.println("hello    "+result);
                    new httpResponse().sendsg(HttpSource.socket,jsonObject.getJSONObject("params"));
                }
            }
            if(jsonObject.getString("url").equals("/add")){
                if(jsonObject.containsKey("params")){
                    System.out.println("fail    "+result);
                    new httpResponse().sendsg1(HttpSource.socket,jsonObject.getJSONObject("params"));
                }
            }

        }

    }
}
           

對不同的url進行不同的處理 httpResponse類

package Server;

import com.alibaba.fastjson.JSONObject;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;

public class httpResponse {
    public void sendsg(Socket socket, JSONObject msg){
        PrintWriter printWriter = null;//這裡第二個參數表示自動重新整理緩存
        try {
            printWriter = new PrintWriter(
                    socket.getOutputStream(), true);
        } catch (IOException e) {
           System.out.println(""+e.getMessage());
        }
        printWriter.println("HTTP/1.1 200 OK");
        printWriter.println("Content-Type:text/html;charset=utf-8");
        printWriter.println();
        float a=0;
        float b=0;
        float c=0;
       if(msg.containsKey("a")){
          a= msg.getFloat("a");
       }
        if(msg.containsKey("b")){
            b= msg.getFloat("b");
        }
        if(msg.containsKey("c")){
            c=msg.getFloat("c");
        }
        c=a*2+b-c;
        printWriter.write(c+"");//将日志輸出到浏覽器
        printWriter.close();
    }
    public void sendsg1(Socket socket, JSONObject msg){
        PrintWriter printWriter = null;//這裡第二個參數表示自動重新整理緩存
        try {
            printWriter = new PrintWriter(
                    socket.getOutputStream(), true);
        } catch (IOException e) {
            System.out.println(""+e.getMessage());
        }
        printWriter.println("HTTP/1.1 200 OK");
        printWriter.println("Content-Type:text/html;charset=utf-8");
        printWriter.println();
        float a=0;
        float b=0;
        float c=0;
        if(msg.containsKey("a")){
            a= msg.getFloat("a");
        }
        if(msg.containsKey("b")){
            b= msg.getFloat("b");
        }
        if(msg.containsKey("c")){
            c=msg.getFloat("c");
        }
        c=a+b+c;
        printWriter.write(c+"");//将日志輸出到浏覽器
        printWriter.close();
    }
}
           

對擷取的參數進行過濾篩選

package Server;

import com.alibaba.fastjson.JSONObject;

public class httpRequest {
    public JSONObject analysisContent(String result){
        JSONObject obj=new JSONObject();
        //1、過濾隻留下url 和請求參數 值
        String [] contents=result.split("\\?");
        if(contents.length>1){
            obj.put("url",contents[0]);
            JSONObject keyObject=new JSONObject();
            String [] params=contents[1].split("&");
            for(int i=0;i<params.length;i++){
               String [] map= params[i].split("=");
                keyObject.put(map[0],map[1]);
            }
            obj.put("params",keyObject);
        }
        return obj;
    }
}
           

打包成jar包

flink實作httpServer功能,請求source,sink進行處理傳回

在flink上運作

flink實作httpServer功能,請求source,sink進行處理傳回

測試運作15m以後是否有效

不同的url傳回不同的處理結果,完美

flink實作httpServer功能,請求source,sink進行處理傳回
flink實作httpServer功能,請求source,sink進行處理傳回

感想

很早接觸flink 非常的排斥,然後很多都是借用工具的,如果沒有開發好的輪子,需要自己造的,就會非常排斥,比如這個功能,我就搞了很久,之前搞的版本是springboot+flink的方式,純jar包,脫離了叢集,我感覺那種方式跟搞着玩一樣。

昨天看了一篇文章窮爸爸富爸爸,講了窮人思維,突然覺得自己有必要思考,結果2h不到,就搞出來了,改變思維方式,不然會越忙越窮

繼續閱讀