寫在前面
我們将從以下四個方面進行介紹:
-
- 用于物體檢測的背景減法算法主要思想。
-
- OpenCV圖像過濾器。
-
- 利用輪廓檢測物體。
-
- 建立進一步資料處理的結構。
1.背景扣除算法:
背景扣除算法有很多種,但是它們的原理幾乎是相同的。假設有一段天空的視訊,當沒有鳥飛過的時候,則稱此時視訊的幀為背景,如果要擷取視訊中的小鳥時,我們隻需要:用目前幀減去背景就可以了。

代碼入下:
import os
import logging
import logging.handlers
import random
import numpy as np
import skvideo.io
import cv2
import matplotlib.pyplot as plt
import utils
# without this some strange errors happen
cv2.ocl.setUseOpenCL(False)
random.seed(123)
#=====================================================================
IMAGE_DIR = "./out"
VIDEO_SOURCE = "input.mp4"
SHAPE = (720, 1280) # HxW
#======================================================================
def train_bg_subtractor(inst, cap, num=500):
'''
BG substractor need process some amount of frames to start giving result
'''
print ('Training BG Subtractor...')
i = 0
for frame in cap:
inst.apply(frame, None, 0.001)
i += 1
if i >= num:
return cap
def main():
log = logging.getLogger("main")
# creting MOG bg subtractor with 500 frames in cache
# and shadow detction
bg_subtractor = cv2.createBackgroundSubtractorMOG2( history=500, detectShadows=True)
# Set up image source
# You can use also CV2, for some reason it not working for me
cap = skvideo.io.vreader(VIDEO_SOURCE)
# skipping 500 frames to train bg subtractor
train_bg_subtractor(bg_subtractor, cap, num=500)
frame_number = -1
for frame in cap:
if not frame.any():
log.error("Frame capture failed, stopping...")
break
frame_number += 1
utils.save_frame(frame, "./out/frame_%04d.png" % frame_number)
fg_mask = bg_subtractor.apply(frame, None, 0.001)
utils.save_frame(frame, "./out/fg_mask_%04d.png" % frame_number)
#======================================================================
if __name__ == "__main__":
log = utils.init_logging()
if not os.path.exists(IMAGE_DIR):
log.debug("Creating image directory `%s`...", IMAGE_DIR)
os.makedirs(IMAGE_DIR)
main()
處理後得到如下的前景圖像:
我們可以看出前景圖像上有一些噪音,可以通過标準濾波技術可以将其消除。
2.濾波處理
針對我們現在的情況,我們将需要以下濾波函數:
Threshold、Erode、Dilate、Opening、 Closing。
首先,我們使用“Closing”來移除區域中的間隙,然後使用“Opening”來移除個别獨立的像素點,然 後使用“Dilate”進行擴張以使對象變粗。代碼如下:
def filter_mask(img):
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
# Fill any small holes
closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
# Remove noise
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
# Dilate to merge adjacent blobs
dilation = cv2.dilate(opening, kernel, iterations=2)
# threshold
th = dilation[dilation < 240] = 0
return th
處理後的前景圖像如下:
3.利用輪廓進行圖像的檢測。
我們将使用cv2.findContours()函數對輪廓進行檢測。我們在使用的時候可以選擇的參數為:
cv2.CV_RETR_EXTERNAL------僅擷取外部輪廓。
cv2.CV_CHAIN_APPROX_TC89_L1------使用Teh-Chin鍊逼近算法(更快)
代碼如下:
def get_centroid(x, y, w, h):
x1 = int(w / 2)
y1 = int(h / 2)
cx = x + x1
cy = y + y1
return (cx, cy)
def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35):
matches = []
# finding external contours
im, contours, hierarchy = cv2.findContours( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
# filtering by with, height
for (i, contour) in enumerate(contours):
(x, y, w, h) = cv2.boundingRect(contour)
contour_valid = (w >= min_contour_width) and (h >= min_contour_height)
if not contour_valid:
continue
# getting center of the bounding box
centroid = get_centroid(x, y, w, h)
matches.append(((x, y, w, h), centroid))
return matches
4.建立項目的整體架構
class PipelineRunner(object):
'''
Very simple pipline.
Just run passed processors in order with passing context from one to another.
You can also set log level for processors.
'''
def __init__(self, pipeline=None, log_level=logging.DEBUG):
self.pipeline = pipeline or []
self.context = {}
self.log = logging.getLogger(self.__class__.__name__)
self.log.setLevel(log_level)
self.log_level = log_level
self.set_log_level()
def set_context(self, data):
self.context = data
def add(self, processor):
if not isinstance(processor, PipelineProcessor):
raise Exception( 'Processor should be an isinstance of PipelineProcessor.')
processor.log.setLevel(self.log_level)
self.pipeline.append(processor)
def remove(self, name):
for i, p in enumerate(self.pipeline):
if p.__class__.__name__ == name:
del self.pipeline[i]
return True
return False
def set_log_level(self):
for p in self.pipeline:
p.log.setLevel(self.log_level)
def run(self):
for p in self.pipeline:
self.context = p(self.context)
self.log.debug("Frame #%d processed.", self.context['frame_number'])
return self.context
class PipelineProcessor(object):
'''
Base class for processors.
'''
def __init__(self):
self.log = logging.getLogger(self.__class__.__name__)
class ContourDetection(PipelineProcessor):
'''
Detecting moving objects.
Purpose of this processor is to subtrac background, get moving objects
and detect them with a cv2.findContours method, and then filter off-by
width and height.
bg_subtractor - background subtractor isinstance.
min_contour_width - min bounding rectangle width.
min_contour_height - min bounding rectangle height.
save_image - if True will save detected objects mask to file.
image_dir - where to save images(must exist).
'''
def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35)
super(ContourDetection, self).__init__()
self.bg_subtractor = bg_subtractor
self.min_contour_width = min_contour_width
self.min_contour_height = min_contour_height
self.save_image = save_image
self.image_dir = image_dir
def filter_mask(self, img, a=None):
'''
This filters are hand-picked just based on visual tests
'''
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
# Fill any small holes
closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
# Remove noise
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
# Dilate to merge adjacent blobs
dilation = cv2.dilate(opening, kernel, iterations=2)
return dilation
def detect_vehicles(self, fg_mask, context):
matches = []
# finding external contours
im2, contours, hierarchy = cv2.findContours( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
for (i, contour) in enumerate(contours):
(x, y, w, h) = cv2.boundingRect(contour)
contour_valid = (w >= self.min_contour_width) and ( h >= self.min_contour_height)
if not contour_valid:
continue
centroid = utils.get_centroid(x, y, w, h)
matches.append(((x, y, w, h), centroid))
return matches
def __call__(self, context):
frame = context['frame'].copy()
frame_number = context['frame_number']
fg_mask = self.bg_subtractor.apply(frame, None, 0.001)
# just thresholding values
fg_mask[fg_mask < 240] = 0
fg_mask = self.filter_mask(fg_mask, frame_number)
if self.save_image:
utils.save_frame(fg_mask, self.image_dir + "/mask_%04d.png" % frame_number, flip=False)
context['objects'] = self.detect_vehicles(fg_mask, context)
context['fg_mask'] = fg_mask
return contex
class CLQ:
'''
Counting vehicles that entered in exit zone.
Purpose of this class based on detected object and local cache create objects pathes and count that entered in exit zone defined by exit masks.
exit_masks - list of the exit masks.
path_size - max number of points in a path.
max_dst - max distance between two points.
'''