目前考慮公司的裝置通信時,需要對通信協定加密提供安全性,首先考慮有以下幾點需求:
1. 因為不希望被抓包看到消息内容,是以需要能雙向解密的
2. 裝置與服務的所有消息都會加密,是以性能越高越好
是以目前就優先考慮使用對稱加密,相比較非對稱加密,與裝置通信要求能雙方都能寫入和解密,并且對稱加密的性能會好很多。對稱加密 AES的 相對于 DES 和 3DES 密鑰長度更長,安全性也更高,是以我們選擇使用AES加密。
AES我們使用 ECB 模式,這種模式不需要使用向量,使用更加簡單一些,密鑰長度選擇最長的256位,因為AES加密是以塊為機關進行運算,通常一個塊都是 16 位,我們選擇 pkcs7 作為填充算法,這個算法主要是:
如果不滿16位需要補齊到16位,如果剛好16位則需要填入完整的16位以保證資料中一定會有填充的字元,而且填充字元就是需要填充數量的ascii碼值,這樣在丢棄填充時直接讀取最後一位的填充字元就知道需要舍棄多少長度的資料了
AES加密結果是一個位串,我們的消息協定是json的文本協定,是以加密結果在傳遞時,我們會将消息統一做base64編碼轉換成字元串進行傳輸,無可避免json文本協定和base64會将消息變得冗長,但是得益于營運商的流量資費越來越便宜和底層裝置的性能比以前要優秀,這樣也不會有什麼設計層面的問題。
def pkcs7padding(data):
bs = AES.block_size
padding = bs - len(data) % bs
padding_text = chr(padding) * padding
return data + padding_text.encode()
def pkcs7unpadding(data):
lengt = len(data)
unpadding = data[lengt - 1] if type(data[lengt - 1]) is int else ord(data[lengt - 1])
return data[0:lengt-unpadding]
class AesCrypter(object):
def __init__(self, key):
self.key = key.encode()
def encrypt(self, data):
"""
AES 加密, 加密模式ECB,填充:pkcs7padding,密鑰長度:256
:param data:
:return:
"""
data = pkcs7padding(data)
cipher = AES.new(self.key, AES.MODE_ECB)
encrypted = cipher.encrypt(data)
return base64.b64encode(encrypted)
def decrypt(self, data):
data = base64.b64decode(data)
cipher = AES.new(self.key, AES.MODE_ECB)
decrypted = cipher.decrypt(data)
decrypted = pkcs7unpadding(decrypted)
return decrypted.decode()
if __name__ == '__main__':
aes = AesCrypter('4099279a2929c911b8508805abf88b2d')
res = aes.encrypt(b'{"cmd": 3000, "msg": "ok"}').decode(encoding='utf-8')
print(res)
print(aes.decrypt(res))
Python實作API伺服器可以實作将消息下發,裝置通過 MQTT 上報的消息則會到 Golang 實作的解析器中,是以還需要在Golang中也實作一個簡單的 ECB 模式的 AES 加解密
import (
"bytes"
"crypto/aes"
"encoding/base64"
"fmt"
)
type AesTool struct {
Key []byte
BlockSize int
}
func NewAesTool(key []byte, blockSize int) *AesTool {
return &AesTool{Key: key, BlockSize: blockSize}
}
func (at *AesTool) padding(src []byte) []byte {
//填充個數
padding := aes.BlockSize - len(src) % aes.BlockSize
paddingText := bytes.Repeat([]byte{byte(padding)}, padding)
return append(src, paddingText...)
}
func (at *AesTool) unPadding(src []byte) []byte {
size := len(src)
return src[:(size-int(src[size-1]))]
}
func (at *AesTool) Encrypt(src []byte) ([]byte, error) {
//key隻能是 16 24 32長度
block, err := aes.NewCipher(at.Key)
if err != nil {
return nil, err
}
//padding
src = at.padding(src)
//傳回加密結果
encryptData := make([]byte, len(src))
//存儲每次加密的資料
tmpData := make([]byte, at.BlockSize)
//分組分塊加密
for index := 0; index < len(src); index += at.BlockSize {
block.Encrypt(tmpData, src[index:index+at.BlockSize])
copy(encryptData[index:index+at.BlockSize], tmpData)
}
return encryptData, nil
}
func (at *AesTool) Decrypt(src []byte) ([]byte, error) {
//key隻能是 16 24 32長度
block, err := aes.NewCipher(at.Key)
if err != nil {
return nil, err
}
//傳回加密結果
decryptData := make([]byte, len(src))
//存儲每次加密的資料
tmpData := make([]byte, at.BlockSize)
//分組分塊加密
for index := 0; index < len(src); index += at.BlockSize {
block.Decrypt(tmpData, src[index:index+at.BlockSize])
copy(decryptData[index:index+at.BlockSize], tmpData)
}
return at.unPadding(decryptData), nil
}
func (at *AesTool) EncryptString(src string) string {
b, err := at.Encrypt([]byte(src))
if err != nil {
return ""
}
return base64.StdEncoding.EncodeToString(b)
}
func (at *AesTool) DecryptString(src string) ([]byte, error) {
decodeBytes, err := base64.StdEncoding.DecodeString(src)
if err != nil {
return nil, err
}
return at.Decrypt(decodeBytes)
}
func Test() {
tool := NewAesTool([]byte("4099279a2929c911b8508805abf88b2d"), 16)
encrypted := tool.EncryptString("{\"cmd\": 3000, \"msg\": \"ok\"}")
fmt.Println("encrypted:", encrypted)
decrypted, _ := tool.DecryptString(encrypted)
fmt.Println("decrypted:", string(decrypted))
}