在本教程中,我們将使用Amazon Web Services(AWS),Slack和Raspberry Pi建構一個IoT項目原型。我們的項目簡單示範了如何通過內建流行的産品和服務來建立自定義的,啟用雲的傳感器系統。它來自Internet上的多種資源。
它是如何工作的?
使用DS18B20溫度傳感器,樹莓派每分鐘測量一次溫度。它通過HTTP POST請求将測量資料(傳感器名稱、時間戳、攝氏溫度和華氏溫度)發送到AWS API網關端點。端點調用一個Lambda函數,該函數将資料插入到DynamoDB表中。
另外,AWS EventBridge每分鐘調用一次第二個Lambda函數。 此函數在DynamoDB表中查詢最近60秒内插入的所有項目,然後通過HTTP POST請求将它們發送到Slack通道。
為了安全起見,API Gateway端點使用存儲在AWS Systems Manager中的授權令牌。
需求
此項目需要一個AWS賬戶,一個Slack賬戶,AWS指令行界面(CLI),AWS無伺服器應用程式模型(SAM)CLI,Raspberry Pi,Linux的Raspbian發行版,DS18B20溫度傳感器和Python 3。
設定Slack
我們項目的第一個元件是帶有傳入的Webhooks的Slack應用。我們根據Slack網站上的
教程 ( https://api.slack.com/messaging/webhooks https://api.slack.com/messaging/webhooks)建立了該應用。我們記下Webhook的URL,因為在下面的部分中将需要它。
設定AWS
我們項目的第二個元件是使用API網關,DynamoDB,EventBridge,Lambda和Systems Manager服務的AWS無伺服器應用程式。它具有以下檔案夾結構:
project/
|__ template.yaml
|__ iot/
|__ app.py
|__ requirements.txt
template.yaml的内容是:
Transform: AWS::Serverless-2016-10-31
Globals:
Api:
Auth:
Authorizers:
TokenValidator:
FunctionArn: !GetAtt ValidateRequest.Arn
FunctionPayloadType: TOKEN
Identity:
Header: x-api-token
ReauthorizeEvery: 0
DefaultAuthorizer: TokenValidator
EndpointConfiguration: REGIONAL
Function:
Environment:
Variables:
DYNAMODB_TABLE: sensor-data
Runtime: python3.7 # Change as necessary.
Resources:
ValidateRequest:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./iot
Handler: app.validate_request
Policies:
– Statement:
– Action:
– ssm:GetParameter
Effect: Allow
Resource: !Sub arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/api-token
Version: ‘2012-10-17’
HandleSensorRequest:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./iot
Handler: app.handle_sensor_request
Policies:
Statement:Action:dynamodb:PutItem
Effect: Allow
Resource: !GetAtt SensorData.Arn
Version: '2012-10-17'
Events:
SensorResource:
Type: Api
Properties:
Method: POST
Path: /sensor
MakeSlackRequest:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./iot
Handler: app.make_slack_request
Policies:
Statement:Action:dynamodb:Query
Effect: Allow
Resource: !GetAtt SensorData.Arn
Version: '2012-10-17'
Statement:Action:ssm:GetParameter
Effect: Allow
Resource: !Sub arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/slack-url
Events:
SlackSchedule:
Type: Schedule
Properties:
Schedule: rate(1 minute)
SensorData:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
AttributeName: sensor
AttributeType: SAttributeName: timestamp
AttributeType: N
KeySchema:
AttributeName: sensor
KeyType: HASHAttributeName: timestamp
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
TableName: sensor-data
Outputs:
SensorURL:
Value: !Sub '
https://
${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/${ServerlessRestApi.Stage}/sensor'
app.py的内容是:
import decimal
import json
import os
import time
import boto3
import boto3.dynamodb.conditions
import requests
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
SENSORS = ['1']
def get_stored_parameter(name):
ssm = boto3.client('ssm')
response = ssm.get_parameter(
Name=name,
WithDecryption=True
)
return response['Parameter']['Value']
def validate_request(event, context):
expected_token = get_stored_parameter('api-token')
if event['authorizationToken'] == expected_token:
effect = 'Allow'
else:
effect = 'Deny'
return {
'principalId': '*',
'policyDocument': {
'Version': '2012-10-17',
'Statement': [{
'Action': 'execute-api:Invoke',
'Effect': '{}'.format(effect),
'Resource': '{}'.format(event['methodArn'])
}]
}
}
def handle_sensor_request(event, context):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
table.put_item(
Item=json.loads(event['body'], parse_float=decimal.Decimal)
)
return {'body': event['body']}
def compute_timestamp(value):
time_in_seconds = time.time()
return decimal.Decimal(time_in_seconds - value)
def query_table(sensor, timestamp):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
response = table.query(
KeyConditionExpression=
boto3.dynamodb.conditions.Key('sensor').eq(sensor)
& boto3.dynamodb.conditions.Key('timestamp').gte(timestamp)
)
return response['Items']
def create_message(item):
gmt = time.gmtime(item['timestamp'])
datetime_format = time.strftime('%Y-%m-%d %H:%M:%S', gmt)
return '{} Sensor {} {}U00002103 {}U00002109n'.format(
datetime_format,
item['sensor'],
item['celsius'],
item['fahrenheit']
)
def make_slack_request(event, context):
timestamp = compute_timestamp(60)
status = {}
for sensor in SENSORS:
items = query_table(sensor, timestamp)
if items:
messages = [create_message(item) for item in items]
body = {'text': 'n'.join(messages)}
slack_url = get_stored_parameter('slack-url')
response = requests.post(slack_url, json=body)
status[sensor] = response.status_code
return status
Requirements.txt的内容是:
requests
設定AWS要求我們從Linux Shell運作幾個指令。
首先,使用AWS CLI,我們将Slack應用程式Webhook的URL存儲在Systems Manager中。 我們将VALUE替換為實際網址。
aws configure set cli_follow_urlparam false
aws ssm put-parameter --name 'slack-url' --value 'VALUE' --type 'SecureString'
其次,我們将API授權令牌存儲在Systems Manager中。我們的令牌是一個字元串,用于驗證對API Gateway的請求。 (建立安全令牌不在本文讨論範圍之内。)同樣,我們将VALUE替換為實際令牌。
aws ssm put-parameter --name 'api-token' --value 'VALUE' --type 'SecureString'
最後,我們使用AWS SAM CLI建構和部署無伺服器應用程式。(我們從上述檔案夾結構的項目目錄中運作這些特定指令。)
sam build
sam deploy --guided
部署應用程式後,我們記下sam deploy --guided指令的輸出中引用的SensorURL,因為在下面将需要它。
設定Raspberry Pi
我們項目的第三個也是最後一個元件是具有DS18B20溫度傳感器和簡短Python程式的Raspberry Pi。 我們配置了Raspberry Pi,并根據Adafruit網站上的
學習子產品 https://learn.adafruit.com/adafruits-raspberry-pi-lesson-11-ds18b20-temperature-sensing)安裝了溫度傳感器。
Python程式是一個名為ds18b20.py的檔案。它很大程度上是對在同一Adafruit學習子產品中找到的示例的重寫。其内容是:
import logging
import os
import pathlib
import time
import requests
AWS_API_TOKEN = os.environ['AWS_API_TOKEN']
AWS_SENSOR_URL = os.environ['AWS_SENSOR_URL']
SENSOR_NAME = os.environ['SENSOR_NAME']
logging.getLogger(__name__)
def find_device_file():
base_directory = pathlib.Path('/sys/bus/w1/devices')
device_file = next(
base_directory.glob('28*/w1_slave')
)
return base_directory.joinpath(device_file)
def read_device_file(filename):
with open(filename, 'r') as device_file:
return device_file.read()
def compute_temperature(reading):
celsius = int(reading) / 1000
fahrenheit = celsius * 9 / 5 + 32
return celsius, fahrenheit
def send_temperature(celsius, fahrenheit):
header = {
'x-api-token': AWS_API_TOKEN
}
body = {
'sensor': SENSOR_NAME,
'timestamp': time.time(),
'celsius': celsius,
'fahrenheit': fahrenheit
}
response = requests.post(
AWS_SENSOR_URL,
json=body,
headers=header
)
if response.status_code != 200:
logging.warning(
'Status code {}'.format(response.status_code)
)
def main():
device_file = find_device_file()
while True:
data = read_device_file(device_file)
while 'YES' not in data:
time.sleep(1)
data = read_device_file(device_file)
else:
_, device_reading = data.split('t=')
celsius, fahrenheit = compute_temperature(device_reading)
send_temperature(celsius, fahrenheit)
time.sleep(60)
if name == '__main__':
main()
在運作程式之前,我們在Linux Shell中設定了三個環境變量。AWS_API_TOKEN是上一節中的API授權令牌。AWS_SENSOR_URL是Raspberry Pi向其發送請求的URL;這是上一節中提到的SensorURL。最後,SENSOR_NAME是我們配置設定給Raspberry Pi的名稱。與往常一樣,我們将VALUE替換為每個環境變量的實際值。
export AWS_API_TOKEN=VALUE
export AWS_SENSOR_URL=VALUE
export SENSOR_NAME=1
如果我們希望環境變量保持不變,則将它們添加到我們的.bashrc檔案中。
echo export AWS_API_TOKEN=VALUE >> ~/.bashrc
echo export AWS_SENSOR_URL=VALUE >> ~/.bashrc
echo export SENSOR_NAME=1 >> ~/.bashrc
然後,我們運作程式。從現在開始,Raspberry Pi将每分鐘一次将測量資料發送到AWS API Gateway端點。AWS EventBridge将以相同的頻率從DynamoDB表檢索資料,并将其發送到我們的Slack通道。
python3 ds18b20.py
(我們的程式需要第三方Python庫requests。如果我們還沒有安裝它,那麼我們可以通過運作來自Linux shell的
sudo pip install requests
來安裝它。在新的Raspberry Pi上,我們可能需要先運作
sudo apt-get install python3-pip
。)
總結
我們的原型IoT項目內建了AWS,Slack和Raspberry Pi。它提供了一個示例,說明如何使用流行的産品和服務來建構自定義的、支援雲計算的傳感器系統。為了增強我們的原型,我們可以增加帶有DS18B20溫度傳感器的Raspberry Pi的數量,或添加其他類型的傳感器(例如濕度傳感器),将Python程式轉換為可安裝的程式包,将Python程式作為Linux服務運作,以及建立傳感器資料的可視化。創意是無限的!
原文連結