天天看點

伺服器處理網關資訊的隊列——Redis

1.Redis安裝

此次簡單介紹redis的單點安裝。

安裝環境:ubuntu 16.04.5

安裝步驟:

  1. 更新源
sudo apt-get update
           
  1. 安裝redis
sudo apt-get install redis-server
           
  1. 查詢是否開啟

    redis在安裝成功後會自動開啟服務 預設端口 6379

ps aux|grep redis
           
  1. 修改配置檔案

    redis配置檔案路徑 /etc/redis/redis.conf

    注釋掉:bind 127.0.0.1,使遠端可以通路

    伺服器處理網關資訊的隊列——Redis
    添加使用者校驗 requirepass wentaoZhang
    伺服器處理網關資訊的隊列——Redis
  2. 重新開機服務
service redis-server restart
           

2.應用場景

redis 的應用場景很多,排行榜、分布式鎖、消息隊列等。此次我們使用redis 的list 實作一個消息的消費隊列,并賦予消息生命周期。

網關傳送至伺服器的資料有PUSH_DATA和PULL_DATA,需要不同的業務處理,為了應對大并發條件下的資料處理,利用redis的list 将PUSH_DATA和PULL_DATA放入兩個消費隊列中。

3.撸

我使用Redis 的java 實作 Jedis

添加依賴

<dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.0.1</version>
        </dependency>
           

參數硬編到代碼

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {
    private static String HOST = "192.168.237.128";
    private static int PORT = 6379;
    private static String PWD = "wentaoZhang";
    private static JedisPool jedisPool = null;
    private static Jedis jedis = null;

    /**
     * 初始化Redis連接配接池
     */
    static {
        JedisPoolConfig config = new JedisPoolConfig();
        //最大連接配接數,如果指派為-1,則表示不限制;如果pool已經配置設定了maxActive個jedis執行個體,則此時pool的狀态為exhausted(耗盡)。
        config.setMaxTotal(50);
        //最大空閑數,控制一個pool最多有多少個狀态為idle(空閑的)的jedis執行個體,預設值也是8。
        config.setMaxIdle(50);
        //最小空閑數
        config.setMinIdle(10);
        //是否在從池中取出連接配接前進行檢驗,如果檢驗失敗,則從池中去除連接配接并嘗試取出另一個
        config.setTestOnBorrow(true);
        //在return給pool時,是否提前進行validate操作
        config.setTestOnReturn(true);
        //在空閑時檢查有效性,預設false
        config.setTestWhileIdle(true);
        //表示一個對象至少停留在idle狀态的最短時間,然後才能被idle object evitor掃描并驅逐;
        //這一項隻有在timeBetweenEvictionRunsMillis大于0時才有意義
        config.setMinEvictableIdleTimeMillis(30000);
        //表示idle object evitor兩次掃描之間要sleep的毫秒數
        config.setTimeBetweenEvictionRunsMillis(60000);
        //表示idle object evitor每次掃描的最多的對象數
        config.setNumTestsPerEvictionRun(1000);
        //等待可用連接配接的最大時間,機關毫秒,預設值為-1,表示永不逾時。如果超過等待時間,則直接抛出JedisConnectionException;
        config.setMaxWaitMillis(-1);

        jedisPool = new JedisPool(config, HOST, PORT, 10000, PWD);
    }

    public static Jedis getJedis(){
        jedis =  jedisPool.getResource();
        return jedis;
    }

    public static void returnJedis(){
        if(jedis!= null && jedisPool!= null){
            jedisPool.close();
        }
    }

    public static void lpush(String list,String value){
        if(jedis == null){
            getJedis();
        }
        Long lpush = jedis.lpush(list, value);
        returnJedis();
    }

    public static String rpop(String list){
        if(jedis == null){
            getJedis();
        }
        String rpop = jedis.rpop(list);
        returnJedis();

        return rpop;
    }
}

           

接Mqtt那篇代碼,在回調函數講不同topic 資料放入不同list。

@Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {

        System.out.println("Client topic : " + topic);
        System.out.println("Client Qos : " + message.getQos());
        System.out.println("Client msg : " + new String(message.getPayload()));

        RedisUtil.lpush(topic,new String(message.getPayload()));
        String rpop = RedisUtil.rpop(topic);
        System.out.println("rpop :"+rpop);


    }
           

控制台資訊:

MqttSub connect
Client topic : PUSH_DATA
Client Qos : 1
Client msg : {"rxpk":[
	{
		"time":"2013-03-31T16:21:17.528002Z",
		"tmst":3512348611,
		"chan":2,
		"rfch":0,
		"freq":866.349812,
		"stat":1,
		"modu":"LORA",
		"datr":"SF7BW125",
		"codr":"4/6",
		"rssi":-35,
		"lsnr":5.1,
		"size":32,
		"data":"-DS4CGaDCdG+48eJNM3Vai-zDpsR71Pn9CPA9uCON84"
	},{
		"time":"2013-03-31T16:21:17.530974Z",
		"tmst":3512348514,
		"chan":9,
		"rfch":1,
		"freq":869.1,
		"stat":1,
		"modu":"FSK",
		"datr":50000,
		"rssi":-75,
		"size":16,
		"data":"VEVTVF9QQUNLRVRfMTIzNA=="
	},{
		"time":"2013-03-31T16:21:17.532038Z",
		"tmst":3316387610,
		"chan":0,
		"rfch":0,
		"freq":863.00981,
		"stat":1,
		"modu":"LORA",
		"datr":"SF10BW125",
		"codr":"4/7",
		"rssi":-38,
		"lsnr":5.5,
		"size":32,
		"data":"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass"
	}
]}

rpop :{"rxpk":[
	{
		"time":"2013-03-31T16:21:17.528002Z",
		"tmst":3512348611,
		"chan":2,
		"rfch":0,
		"freq":866.349812,
		"stat":1,
		"modu":"LORA",
		"datr":"SF7BW125",
		"codr":"4/6",
		"rssi":-35,
		"lsnr":5.1,
		"size":32,
		"data":"-DS4CGaDCdG+48eJNM3Vai-zDpsR71Pn9CPA9uCON84"
	},{
		"time":"2013-03-31T16:21:17.530974Z",
		"tmst":3512348514,
		"chan":9,
		"rfch":1,
		"freq":869.1,
		"stat":1,
		"modu":"FSK",
		"datr":50000,
		"rssi":-75,
		"size":16,
		"data":"VEVTVF9QQUNLRVRfMTIzNA=="
	},{
		"time":"2013-03-31T16:21:17.532038Z",
		"tmst":3316387610,
		"chan":0,
		"rfch":0,
		"freq":863.00981,
		"stat":1,
		"modu":"LORA",
		"datr":"SF10BW125",
		"codr":"4/7",
		"rssi":-38,
		"lsnr":5.5,
		"size":32,
		"data":"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass"
	}
]}
           

這樣做我在做終端測試的時候會遇到一個問題,就是終端網關正常,伺服器當機後,redis的隊列會一直增大,超過終端接收視窗的資料,終端不能接受到伺服器的回複。在生産環境中由于資料上報間隔長,伺服器分布式部署,加上自測lpush rpop 每秒十萬+的處理能力,姑且認為沒毛病。

我寫的都是LoRaWAN NS伺服器在設計過程中使用到的技術,菜鳥一枚,獻醜了

做這個伺服器真的是很麻煩,不過你可以跟着我慢慢來,下回我會處理下data":"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass

老猴子應該一眼就看出了Base64

下回再唠吧

繼續閱讀