天天看點

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

樹莓派:傳感器安裝與測試

準備

燒錄完系統的 raspberryPi 3

python 2.7

傳感器:

  1. 樹莓派專用攝像頭 v2.1
  2. 四位數位管
  3. 溫濕度傳感器

攝像頭

官方文檔 1(簡略版)

官方文檔2 (完整版)

安裝picamera

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

sudo apt-get install python3-picamera

使用示例,in python3.4:

camera = picamera.PiCamera()
camera.capture('image.jpg',resize=(,)) # 在 在目前檔案夾下将截取圖像存為image.jpg, 320*240大小
camera.close() # 使用完後一定要關閉, 否則因為端口被占用,下次調用會出錯
           

沒有關閉

camera

對象在下次調用會出現以下錯誤

mmal: mmal_vc_port_enable: failed to enable port vc.null_sink:in:(OPQV): ENOSPC
mmal: mmal_port_enable: failed to enable connected port (vc.null_sink:in:(OPQV)) (ENOSPC)
mmal: mmal_connection_enable: output port couldn't be enabled
Traceback (most recent call last):
  File "<stdin>", line , in <module>
  File "/usr/lib/python3/dist-packages/picamera/camera.py", line , in __init__
    self._init_preview()
  File "/usr/lib/python3/dist-packages/picamera/camera.py", line , in _init_preview
    self, self._camera.outputs[self.CAMERA_PREVIEW_PORT])
  File "/usr/lib/python3/dist-packages/picamera/renderers.py", line , in __init__
    self.renderer.inputs[].connect(source).enable()
  File "/usr/lib/python3/dist-packages/picamera/mmalobj.py", line , in enable
    prefix="Failed to enable connection")
  File "/usr/lib/python3/dist-packages/picamera/exc.py", line , in mmal_check
    raise PiCameraMMALError(status, prefix
picamera.exc.PiCameraMMALError: Camera component couldn't be enabled: Out of resources (other than memory)
           

四位數位管

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

常見的輸出顯示,四個數字+四個數點,總共 4*7+4=32個led燈管。如果采用一一對應的輸入方式,至少需要32個針腳,但是實際上隻有12個針腳。

是以我們采用動态方法:每一時刻隻亮四個數字中的一個,并以足夠快的速度在四個數字中循環,看起來就像一起亮着。

樹莓派的針腳分布編号如下圖:

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

我們這裡采用BCM編碼。

四位數位管的針腳與樹莓派針腳的連接配接:

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

其中PORT表示數位管的編碼(下圖上半部分):

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

下方的table中寫出了各個要顯示的數字所需要的A-G各個針腳的電平。由于這款3461BS四位數位管是共陽極的,是以當A-G針腳對應的輸入電平為負的時候,對應的led燈管亮起。

結合GPIO子產品,用python實作按照輸入的參數數字字元串輸出數字到數位管上:

3461BS.py(python2.7)

import RPi.GPIO as GPIO
import time
import sys

mode0=[,,,,,,]
mode1=[,,,,,,]
mode2=[,,,,,,]
mode3=[,,,,,,]
mode4=[,,,,,,]
mode5=[,,,,,,]
mode6=[,,,,,,]
mode7=[,,,,,,]
mode8=[,,,,,,]
mode9=[,,,,,,]

PORT_TABLE=[mode0,mode1,mode2,mode3,mode4,mode5,mode6,mode7,mode8,mode9]
BCM_PORT=[,,,]
BCM_VALUE=[,,,,,,,]

def init():
    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BCM)
    for port in BCM_PORT:
        GPIO.setup(port,GPIO.OUT)
        GPIO.output(port,GPIO.HIGH)
    for value in BCM_VALUE:
        GPIO.setup(value,GPIO.OUT)
        GPIO.output(value,GPIO.LOW)
    for port in BCM_PORT:
        GPIO.output(port,GPIO.LOW)

def light_off():
    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BCM)
    for port in BCM_PORT:
        GPIO.setup(port,GPIO.OUT)
        GPIO.output(port,GPIO.LOW)
    for value in BCM_VALUE:
        GPIO.setup(value,GPIO.OUT)
        GPIO.output(value,GPIO.HIGH)

def dp_allume(dp_index):
    light_off()
    GPIO.setup(BCM_PORT[dp_index],GPIO.OUT)
    GPIO.output(BCM_PORT[dp_index],GPIO.HIGH)
    GPIO.setup(BCM_VALUE[],GPIO.OUT)
    GPIO.output(BCM_VALUE[],GPIO.LOW)

def val_allume(value,led_index):
    light_code=PORT_TABLE[value]
    light_off()
    for lc in range():
        if light_code[lc]==:
            GPIO.setup(BCM_VALUE[lc],GPIO.OUT)
            GPIO.output(BCM_VALUE[lc],GPIO.LOW)
        else:
            GPIO.setup(BCM_VALUE[lc],GPIO.OUT)
            GPIO.output(BCM_VALUE[lc],GPIO.HIGH)
    for li in range():
        if li==led_index:
            GPIO.setup(BCM_PORT[li],GPIO.OUT)
            GPIO.output(BCM_PORT[li],GPIO.HIGH)
        else:
            GPIO.setup(BCM_PORT[li],GPIO.OUT)
            GPIO.output(BCM_PORT[li],GPIO.LOW)

def value_to_list(val):
    num_list=[-,-,-,-,-]
    if val<:
        num_list[]=int(val//)
        val=val-num_list[]
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
    elif val<:
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val*)
    elif val<:
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]
        num_list[]=int(val*)
    elif val<:
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val)
    else:
        pass
    return num_list

def show(number=,show_time=):
    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BCM)
    light_off()
    num_list=value_to_list(number)
    for itr in range():
        if num_list[itr]==-:
            dp_itr=itr
            break
    del(num_list[dp_itr])
    tmp=time.clock()
    while time.clock()-tmp<show_time:
        for itr in range():
            val_allume(num_list[itr],itr)
        dp_allume(dp_itr-)
    light_off()

if __name__ == '__main__':
    if len(sys.argv)==:
        show(number=float(sys.argv[]))
        GPIO.cleanup()
    elif len(sys.argv)==:
        show(number=float(sys.argv[]))
        GPIO.cleanup()
    elif len(sys.argv)==:
        show(number=float(sys.argv[]),show_time=float(sys.argv[]))
       GPIO.cleanup()
    else:
      print "Usage:  3461BS.py [SHOW_NUMBER][SHOW_TIME]"
           

由于針腳較多,建議用面包闆進行擴充:

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

溫濕度傳感器

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

用高低電平的變化序列來表示溫度和濕度數字

每一份溫濕度資料的傳輸大約會需要4毫秒,包括

  • 8位的濕度整數部分
  • 8位的濕度小數部分
  • 8位的溫度整數部分
  • 8位的溫度小數部分
  • 8位的校驗和

每一位資料大約持續100微秒

傳感器的通路連接配接分為兩部分

1. 握手階段

  1. 樹莓派向傳感器發送一個開始信号:傳感器預設為高電平,樹莓派輸出一個至少持續18毫秒的低電平後重新拉回高電平,持續20-40微秒
  2. 傳感器向樹莓派回複:傳感器将電平降為低電平,持續80微秒後重新将電平拉回高電平,持續80微秒
樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

2. 資料傳輸階段

每一位資料(0或1)都以50微秒的低電平開始,并以高電平結束。

其中表示0的高電平持續26-28微秒,表示1的高電平持續70微秒。

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試
樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

溫濕度傳感器的針腳連接配接:

樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試
樹莓派:傳感器安裝與測試樹莓派:傳感器安裝與測試

解碼python(2.7)檔案:

dht11.py

#! /usr/bin/python*
import time
import RPi.GPIO as GPIO

class DHT11Result:
    'DHT11 sensor result returned by DHT11.read() method'

    ERR_NO_ERROR = 
    ERR_MISSING_DATA = 
    ERR_CRC = 

    error_code = ERR_NO_ERROR
    temperature = -
    humidity = -

    def __init__(self, error_code, temperature, humidity):
        self.error_code = error_code
        self.temperature = temperature
        self.humidity = humidity

    def is_valid(self):
        return self.error_code == DHT11Result.ERR_NO_ERROR

class DHT11:
    'DHT11 sensor reader class for Raspberry'

    __pin = 

    def __init__(self, pin):
        self.__pin = pin

    def read(self):
        GPIO.setup(self.__pin, GPIO.OUT)

        # send initial high
        self.__send_and_sleep(GPIO.HIGH, )

        # pull down to low
        self.__send_and_sleep(GPIO.LOW, )

        # change to input using pull up
        GPIO.setup(self.__pin, GPIO.IN, GPIO.PUD_UP)

        # collect data into an array
        data = self.__collect_input()

        # parse lengths of all data pull up periods
        pull_up_lengths = self.__parse_data_pull_up_lengths(data)

        # if bit count mismatch, return error (4 byte data + 1 byte checksum)
        if len(pull_up_lengths) != :
                    return DHT11Result(DHT11Result.ERR_MISSING_DATA,,) 
        # calculate bits from lengths of the pull up periods
        bits = self.__calculate_bits(pull_up_lengths)

        # we have the bits, calculate bytes
        the_bytes = self.__bits_to_bytes(bits)

        # calculate checksum and check
        checksum = self.__calculate_checksum(the_bytes)
        if the_bytes[] != checksum:
            return DHT11Result(DHT11Result.ERR_CRC, , )

        # ok, we have valid data, return it
        return DHT11Result(DHT11Result.ERR_NO_ERROR, the_bytes[], the_bytes[])

    def __send_and_sleep(self, output, sleep):
        GPIO.output(self.__pin, output)
        time.sleep(sleep)

    def __collect_input(self):

        # collect the data while unchanged found
        unchanged_count = 

        # this is used to determine where is the end of the data
        max_unchanged_count =  

        last = -
        data = []
        while True:
            current = GPIO.input(self.__pin)
            data.append(current)
            if last != current:
                unchanged_count = 
                last = current
            else:
                unchanged_count += 
                if unchanged_count > max_unchanged_count:
                    break

        return data     

    def __parse_data_pull_up_lengths(self, data):

        STATE_INIT_PULL_DOWN = 
        STATE_INIT_PULL_UP = 
        STATE_DATA_FIRST_PULL_DOWN = 
        STATE_DATA_PULL_UP = 
        STATE_DATA_PULL_DOWN = 

        state = STATE_INIT_PULL_DOWN

        lengths = [] # will contain the lengths of data pull up periods
        current_length =  # will contain the length of the previous period

        for i in range(len(data)):

            current = data[i]
            current_length += 
            if state == STATE_INIT_PULL_DOWN:
                if current == :
                    # ok, we got the initial pull down
                    state = STATE_INIT_PULL_UP
                    continue
                else:
                    continue

            if state == STATE_INIT_PULL_UP:
                if current == :
                    # ok, we got the initial pull up
                    state = STATE_DATA_FIRST_PULL_DOWN
                    continue
                else:
                    continue

            if state == STATE_DATA_FIRST_PULL_DOWN:
                if current == :
                    # we have the initial pull down, the next will be the data pull up
                    state = STATE_DATA_PULL_UP
                    continue
                else:
                    continue
            if state == STATE_DATA_PULL_UP:
                if current == :
                    # data pulled up, the length of this pull up will determine whether it is 0 or 1
                    current_length = 
                    state = STATE_DATA_PULL_DOWN
                    continue
                else:
                    continue

            if state == STATE_DATA_PULL_DOWN:
                if current == :            
                    # pulled down, we store the length of the previous pull up period
                    lengths.append(current_length)
                    state = STATE_DATA_PULL_UP              
                    continue
                else:
                    continue

        return lengths

    def __calculate_bits(self, pull_up_lengths):

        # find shortest and longest period
        shortest_pull_up = 
        longest_pull_up = 

        for i in range(, len(pull_up_lengths)):

            length = pull_up_lengths[i]
            if length < shortest_pull_up:
                shortest_pull_up = length

            if length > longest_pull_up:
                longest_pull_up = length

        # use the halfway to determine whether the period it is long or short
        halfway = shortest_pull_up + (longest_pull_up - shortest_pull_up) / 

        bits = []

        for i in range(, len(pull_up_lengths)):

            bit = False
            if pull_up_lengths[i] > halfway:
                bit = True

            bits.append(bit)

        return bits

    def __bits_to_bytes(self, bits):

        the_bytes = []
        byte = 

        for i in range(, len(bits)):

            byte = byte << 
            if (bits[i]):
                byte = byte | 
            else:
                byte = byte | 

            if ((i + ) %  == ):
                the_bytes.append(byte)
                byte = 

        return the_bytes

    def __calculate_checksum(self, the_bytes):
        return the_bytes[] + the_bytes[] + the_bytes[] + the_bytes[] &       
           

測量執行python程式:

TemHumSensor.py

import dht11
import RPi.GPIO as GPIO
import time

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)


try:
    while True:    
        instance=dht11.DHT11(pin=)
        result = instance.read()
        print 'temperature'+str(result.temperature)
        print 'humidity'+str(result.humidity)
        time.sleep()
except KeyboardInterrupt:
    pass
           

未經允許 請勿轉載