天天看点

树莓派:传感器安装与测试树莓派:传感器安装与测试

树莓派:传感器安装与测试

准备

烧录完系统的 raspberryPi 3

python 2.7

传感器:

  1. 树莓派专用摄像头 v2.1
  2. 四位数码管
  3. 温湿度传感器

摄像头

官方文档 1(简略版)

官方文档2 (完整版)

安装picamera

树莓派:传感器安装与测试树莓派:传感器安装与测试

sudo apt-get install python3-picamera

使用示例,in python3.4:

camera = picamera.PiCamera()
camera.capture('image.jpg',resize=(,)) # 在 在当前文件夹下将截取图像存为image.jpg, 320*240大小
camera.close() # 使用完后一定要关闭, 否则因为端口被占用,下次调用会出错
           

没有关闭

camera

对象在下次调用会出现以下错误

mmal: mmal_vc_port_enable: failed to enable port vc.null_sink:in:(OPQV): ENOSPC
mmal: mmal_port_enable: failed to enable connected port (vc.null_sink:in:(OPQV)) (ENOSPC)
mmal: mmal_connection_enable: output port couldn't be enabled
Traceback (most recent call last):
  File "<stdin>", line , in <module>
  File "/usr/lib/python3/dist-packages/picamera/camera.py", line , in __init__
    self._init_preview()
  File "/usr/lib/python3/dist-packages/picamera/camera.py", line , in _init_preview
    self, self._camera.outputs[self.CAMERA_PREVIEW_PORT])
  File "/usr/lib/python3/dist-packages/picamera/renderers.py", line , in __init__
    self.renderer.inputs[].connect(source).enable()
  File "/usr/lib/python3/dist-packages/picamera/mmalobj.py", line , in enable
    prefix="Failed to enable connection")
  File "/usr/lib/python3/dist-packages/picamera/exc.py", line , in mmal_check
    raise PiCameraMMALError(status, prefix
picamera.exc.PiCameraMMALError: Camera component couldn't be enabled: Out of resources (other than memory)
           

四位数码管

树莓派:传感器安装与测试树莓派:传感器安装与测试

常见的输出显示,四个数字+四个数点,总共 4*7+4=32个led灯管。如果采用一一对应的输入方式,至少需要32个针脚,但是实际上只有12个针脚。

所以我们采用动态方法:每一时刻只亮四个数字中的一个,并以足够快的速度在四个数字中循环,看起来就像一起亮着。

树莓派的针脚分布编号如下图:

树莓派:传感器安装与测试树莓派:传感器安装与测试

我们这里采用BCM编码。

四位数码管的针脚与树莓派针脚的连接:

树莓派:传感器安装与测试树莓派:传感器安装与测试

其中PORT表示数码管的编码(下图上半部分):

树莓派:传感器安装与测试树莓派:传感器安装与测试

下方的table中写出了各个要显示的数字所需要的A-G各个针脚的电平。由于这款3461BS四位数码管是共阳极的,所以当A-G针脚对应的输入电平为负的时候,对应的led灯管亮起。

结合GPIO模块,用python实现按照输入的参数数字字符串输出数字到数码管上:

3461BS.py(python2.7)

import RPi.GPIO as GPIO
import time
import sys

mode0=[,,,,,,]
mode1=[,,,,,,]
mode2=[,,,,,,]
mode3=[,,,,,,]
mode4=[,,,,,,]
mode5=[,,,,,,]
mode6=[,,,,,,]
mode7=[,,,,,,]
mode8=[,,,,,,]
mode9=[,,,,,,]

PORT_TABLE=[mode0,mode1,mode2,mode3,mode4,mode5,mode6,mode7,mode8,mode9]
BCM_PORT=[,,,]
BCM_VALUE=[,,,,,,,]

def init():
    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BCM)
    for port in BCM_PORT:
        GPIO.setup(port,GPIO.OUT)
        GPIO.output(port,GPIO.HIGH)
    for value in BCM_VALUE:
        GPIO.setup(value,GPIO.OUT)
        GPIO.output(value,GPIO.LOW)
    for port in BCM_PORT:
        GPIO.output(port,GPIO.LOW)

def light_off():
    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BCM)
    for port in BCM_PORT:
        GPIO.setup(port,GPIO.OUT)
        GPIO.output(port,GPIO.LOW)
    for value in BCM_VALUE:
        GPIO.setup(value,GPIO.OUT)
        GPIO.output(value,GPIO.HIGH)

def dp_allume(dp_index):
    light_off()
    GPIO.setup(BCM_PORT[dp_index],GPIO.OUT)
    GPIO.output(BCM_PORT[dp_index],GPIO.HIGH)
    GPIO.setup(BCM_VALUE[],GPIO.OUT)
    GPIO.output(BCM_VALUE[],GPIO.LOW)

def val_allume(value,led_index):
    light_code=PORT_TABLE[value]
    light_off()
    for lc in range():
        if light_code[lc]==:
            GPIO.setup(BCM_VALUE[lc],GPIO.OUT)
            GPIO.output(BCM_VALUE[lc],GPIO.LOW)
        else:
            GPIO.setup(BCM_VALUE[lc],GPIO.OUT)
            GPIO.output(BCM_VALUE[lc],GPIO.HIGH)
    for li in range():
        if li==led_index:
            GPIO.setup(BCM_PORT[li],GPIO.OUT)
            GPIO.output(BCM_PORT[li],GPIO.HIGH)
        else:
            GPIO.setup(BCM_PORT[li],GPIO.OUT)
            GPIO.output(BCM_PORT[li],GPIO.LOW)

def value_to_list(val):
    num_list=[-,-,-,-,-]
    if val<:
        num_list[]=int(val//)
        val=val-num_list[]
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
    elif val<:
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val*)
    elif val<:
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]
        num_list[]=int(val*)
    elif val<:
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val//)
        val=val-num_list[]*
        num_list[]=int(val)
    else:
        pass
    return num_list

def show(number=,show_time=):
    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BCM)
    light_off()
    num_list=value_to_list(number)
    for itr in range():
        if num_list[itr]==-:
            dp_itr=itr
            break
    del(num_list[dp_itr])
    tmp=time.clock()
    while time.clock()-tmp<show_time:
        for itr in range():
            val_allume(num_list[itr],itr)
        dp_allume(dp_itr-)
    light_off()

if __name__ == '__main__':
    if len(sys.argv)==:
        show(number=float(sys.argv[]))
        GPIO.cleanup()
    elif len(sys.argv)==:
        show(number=float(sys.argv[]))
        GPIO.cleanup()
    elif len(sys.argv)==:
        show(number=float(sys.argv[]),show_time=float(sys.argv[]))
       GPIO.cleanup()
    else:
      print "Usage:  3461BS.py [SHOW_NUMBER][SHOW_TIME]"
           

由于针脚较多,建议用面包板进行扩展:

树莓派:传感器安装与测试树莓派:传感器安装与测试

温湿度传感器

树莓派:传感器安装与测试树莓派:传感器安装与测试

用高低电平的变化序列来表示温度和湿度数字

每一份温湿度数据的传输大约会需要4毫秒,包括

  • 8位的湿度整数部分
  • 8位的湿度小数部分
  • 8位的温度整数部分
  • 8位的温度小数部分
  • 8位的校验和

每一位数据大约持续100微秒

传感器的通路连接分为两部分

1. 握手阶段

  1. 树莓派向传感器发送一个开始信号:传感器默认为高电平,树莓派输出一个至少持续18毫秒的低电平后重新拉回高电平,持续20-40微秒
  2. 传感器向树莓派回复:传感器将电平降为低电平,持续80微秒后重新将电平拉回高电平,持续80微秒
树莓派:传感器安装与测试树莓派:传感器安装与测试

2. 数据传输阶段

每一位数据(0或1)都以50微秒的低电平开始,并以高电平结束。

其中表示0的高电平持续26-28微秒,表示1的高电平持续70微秒。

树莓派:传感器安装与测试树莓派:传感器安装与测试
树莓派:传感器安装与测试树莓派:传感器安装与测试

温湿度传感器的针脚连接:

树莓派:传感器安装与测试树莓派:传感器安装与测试
树莓派:传感器安装与测试树莓派:传感器安装与测试

解码python(2.7)文件:

dht11.py

#! /usr/bin/python*
import time
import RPi.GPIO as GPIO

class DHT11Result:
    'DHT11 sensor result returned by DHT11.read() method'

    ERR_NO_ERROR = 
    ERR_MISSING_DATA = 
    ERR_CRC = 

    error_code = ERR_NO_ERROR
    temperature = -
    humidity = -

    def __init__(self, error_code, temperature, humidity):
        self.error_code = error_code
        self.temperature = temperature
        self.humidity = humidity

    def is_valid(self):
        return self.error_code == DHT11Result.ERR_NO_ERROR

class DHT11:
    'DHT11 sensor reader class for Raspberry'

    __pin = 

    def __init__(self, pin):
        self.__pin = pin

    def read(self):
        GPIO.setup(self.__pin, GPIO.OUT)

        # send initial high
        self.__send_and_sleep(GPIO.HIGH, )

        # pull down to low
        self.__send_and_sleep(GPIO.LOW, )

        # change to input using pull up
        GPIO.setup(self.__pin, GPIO.IN, GPIO.PUD_UP)

        # collect data into an array
        data = self.__collect_input()

        # parse lengths of all data pull up periods
        pull_up_lengths = self.__parse_data_pull_up_lengths(data)

        # if bit count mismatch, return error (4 byte data + 1 byte checksum)
        if len(pull_up_lengths) != :
                    return DHT11Result(DHT11Result.ERR_MISSING_DATA,,) 
        # calculate bits from lengths of the pull up periods
        bits = self.__calculate_bits(pull_up_lengths)

        # we have the bits, calculate bytes
        the_bytes = self.__bits_to_bytes(bits)

        # calculate checksum and check
        checksum = self.__calculate_checksum(the_bytes)
        if the_bytes[] != checksum:
            return DHT11Result(DHT11Result.ERR_CRC, , )

        # ok, we have valid data, return it
        return DHT11Result(DHT11Result.ERR_NO_ERROR, the_bytes[], the_bytes[])

    def __send_and_sleep(self, output, sleep):
        GPIO.output(self.__pin, output)
        time.sleep(sleep)

    def __collect_input(self):

        # collect the data while unchanged found
        unchanged_count = 

        # this is used to determine where is the end of the data
        max_unchanged_count =  

        last = -
        data = []
        while True:
            current = GPIO.input(self.__pin)
            data.append(current)
            if last != current:
                unchanged_count = 
                last = current
            else:
                unchanged_count += 
                if unchanged_count > max_unchanged_count:
                    break

        return data     

    def __parse_data_pull_up_lengths(self, data):

        STATE_INIT_PULL_DOWN = 
        STATE_INIT_PULL_UP = 
        STATE_DATA_FIRST_PULL_DOWN = 
        STATE_DATA_PULL_UP = 
        STATE_DATA_PULL_DOWN = 

        state = STATE_INIT_PULL_DOWN

        lengths = [] # will contain the lengths of data pull up periods
        current_length =  # will contain the length of the previous period

        for i in range(len(data)):

            current = data[i]
            current_length += 
            if state == STATE_INIT_PULL_DOWN:
                if current == :
                    # ok, we got the initial pull down
                    state = STATE_INIT_PULL_UP
                    continue
                else:
                    continue

            if state == STATE_INIT_PULL_UP:
                if current == :
                    # ok, we got the initial pull up
                    state = STATE_DATA_FIRST_PULL_DOWN
                    continue
                else:
                    continue

            if state == STATE_DATA_FIRST_PULL_DOWN:
                if current == :
                    # we have the initial pull down, the next will be the data pull up
                    state = STATE_DATA_PULL_UP
                    continue
                else:
                    continue
            if state == STATE_DATA_PULL_UP:
                if current == :
                    # data pulled up, the length of this pull up will determine whether it is 0 or 1
                    current_length = 
                    state = STATE_DATA_PULL_DOWN
                    continue
                else:
                    continue

            if state == STATE_DATA_PULL_DOWN:
                if current == :            
                    # pulled down, we store the length of the previous pull up period
                    lengths.append(current_length)
                    state = STATE_DATA_PULL_UP              
                    continue
                else:
                    continue

        return lengths

    def __calculate_bits(self, pull_up_lengths):

        # find shortest and longest period
        shortest_pull_up = 
        longest_pull_up = 

        for i in range(, len(pull_up_lengths)):

            length = pull_up_lengths[i]
            if length < shortest_pull_up:
                shortest_pull_up = length

            if length > longest_pull_up:
                longest_pull_up = length

        # use the halfway to determine whether the period it is long or short
        halfway = shortest_pull_up + (longest_pull_up - shortest_pull_up) / 

        bits = []

        for i in range(, len(pull_up_lengths)):

            bit = False
            if pull_up_lengths[i] > halfway:
                bit = True

            bits.append(bit)

        return bits

    def __bits_to_bytes(self, bits):

        the_bytes = []
        byte = 

        for i in range(, len(bits)):

            byte = byte << 
            if (bits[i]):
                byte = byte | 
            else:
                byte = byte | 

            if ((i + ) %  == ):
                the_bytes.append(byte)
                byte = 

        return the_bytes

    def __calculate_checksum(self, the_bytes):
        return the_bytes[] + the_bytes[] + the_bytes[] + the_bytes[] &       
           

测量执行python程序:

TemHumSensor.py

import dht11
import RPi.GPIO as GPIO
import time

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)


try:
    while True:    
        instance=dht11.DHT11(pin=)
        result = instance.read()
        print 'temperature'+str(result.temperature)
        print 'humidity'+str(result.humidity)
        time.sleep()
except KeyboardInterrupt:
    pass
           

未经允许 请勿转载