天天看點

opencv函數findcontours_使用OpenCV實作道路車輛計數

opencv函數findcontours_使用OpenCV實作道路車輛計數
opencv函數findcontours_使用OpenCV實作道路車輛計數

今天,我們将一起探讨如何基于計算機視覺實作道路交通計數。

在本教程中,我們将僅使用Python和OpenCV,并借助背景減除算法非常簡單地進行運動檢測。

我們将從以下四個方面進行介紹:

1. 用于物體檢測的背景減法算法主要思想。

2. OpenCV圖像過濾器。

3. 利用輪廓檢測物體。

4. 建立進一步資料處理的結構。

背景扣除算法
opencv函數findcontours_使用OpenCV實作道路車輛計數

有許多不同的背景扣除算法,但是它們的主要思想都很簡單。

假設有一個房間的視訊,在某些幀上沒有人和寵物,那麼此時的視訊基本為靜态的,我們将其稱為背景(background_layer)。是以要擷取在視訊上移動的對象,我們隻需要:用目前幀減去背景即可。

由于光照變化,人為移動物體,或者始終存在移動的人和寵物,我們将無法獲得靜态幀。在這種情況下,我們從視訊中選出一些圖像幀,如果絕大多數圖像幀中都具有某個相同的像素點,則此将像素作為background_layer中的一部分。

我們将使用MOG算法進行背景扣除:

opencv函數findcontours_使用OpenCV實作道路車輛計數

原始幀

代碼如下所示:

import 
           

處理後得到下面的前景圖像

opencv函數findcontours_使用OpenCV實作道路車輛計數

我們可以看出前景圖像上有一些噪音,可以通過标準濾波技術可以将其消除。

濾波

針對我們現在的情況,我們将需要以下濾波函數:

Threshold

Erode

Dilate

Opening

Closing

首先,我們使用“

Closing

”來移除區域中的間隙,然後使用“

Opening

”來移除個别獨立的像素點,然後使用“

Dilate

”進行擴張以使對象變粗。代碼如下:

def filter_mask(img):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
    # Fill any small holes
    closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
    # Remove noise
    opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
    # Dilate to merge adjacent blobs
    dilation = cv2.dilate(opening, kernel, iterations=2)
    # threshold
    th = dilation[dilation < 240] = 0
    return th
           

處理後的前景如下:

opencv函數findcontours_使用OpenCV實作道路車輛計數
利用輪廓進行物體檢測

我們将使用cv2.findContours函數對輪廓進行檢測。我們在使用的時候可以選擇的參數為:

cv2.CV_RETR_EXTERNAL------僅擷取外部輪廓。

cv2.CV_CHAIN_APPROX_TC89_L1------使用Teh-Chin鍊逼近算法(更快)

代碼如下:

def get_centroid(x, y, w, h):
      x1 = int(w / 2)
      y1 = int(h / 2)
      cx = x + x1
      cy = y + y1
      return (cx, cy)
  
  def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35):
      matches = []
      # finding external contours
      im, contours, hierarchy = cv2.findContours(
          fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
      # filtering by with, height
      for (i, contour) in enumerate(contours):
          (x, y, w, h) = cv2.boundingRect(contour)
          contour_valid = (w >= min_contour_width) and (
              h >= min_contour_height)
          if not contour_valid:
              continue
          # getting center of the bounding box
          centroid = get_centroid(x, y, w, h)
          matches.append(((x, y, w, h), centroid))
      return matches
           
建立資料處理架構

我們都知道在ML和CV中,沒有一個算法可以處理所有問題。即使存在這種算法,我們也不會使用它,因為它很難大規模有效。例如幾年前Netflix公司用300萬美元的獎金懸賞最佳電影推薦算法。有一個團隊完成這個任務,但是他們的推薦算法無法大規模運作,是以其實對公司毫無用處。但是,Netflix公司仍獎勵了他們100萬美元。

接下來我們來建立解決目前問題的架構,這樣可以使資料的處理更加友善

class PipelineRunner(object):
      '''
          Very simple pipline.
          Just run passed processors in order with passing context from one to 
          another.
          You can also set log level for processors.
      '''
      def __init__(self, pipeline=None, log_level=logging.DEBUG):
          self.pipeline = pipeline or []
          self.context = {}
          self.log = logging.getLogger(self.__class__.__name__)
          self.log.setLevel(log_level)
          self.log_level = log_level
          self.set_log_level()
      def set_context(self, data):
          self.context = data
      def add(self, processor):
          if not isinstance(processor, PipelineProcessor):
              raise Exception(
                  'Processor should be an isinstance of PipelineProcessor.')
          processor.log.setLevel(self.log_level)
          self.pipeline.append(processor)
 
      def remove(self, name):
          for i, p in enumerate(self.pipeline):
              if p.__class__.__name__ == name:
                  del self.pipeline[i]
                  return True
          return False
  
      def set_log_level(self):
          for p in self.pipeline:
              p.log.setLevel(self.log_level)
  
      def run(self):
          for p in self.pipeline:
              self.context = p(self.context) 
          self.log.debug("Frame #%d processed.", self.context['frame_number'])
          return self.context
  
  class PipelineProcessor(object):
      '''
          Base class for processors.
      '''
      def __init__(self):
          self.log = logging.getLogger(self.__class__.__name__)
           

首先我們擷取一張處理器運作順序的清單,讓每個處理器完成一部分工作,在案順序完成執行以獲得最終結果。

我們首先建立輪廓檢測處理器。輪廓檢測處理器隻需将前面的背景扣除,濾波和輪廓檢測部分合并在一起即可,代碼如下所示:

class ContourDetection(PipelineProcessor):
      '''
          Detecting moving objects.
          Purpose of this processor is to subtrac background, get moving objects
          and detect them with a cv2.findContours method, and then filter off-by
          width and height. 
          bg_subtractor - background subtractor isinstance.
          min_contour_width - min bounding rectangle width.
          min_contour_height - min bounding rectangle height.
          save_image - if True will save detected objects mask to file.
          image_dir - where to save images(must exist).        
      '''
  
      def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'):
          super(ContourDetection, self).__init__()
          self.bg_subtractor = bg_subtractor
          self.min_contour_width = min_contour_width
          self.min_contour_height = min_contour_height
          self.save_image = save_image
          self.image_dir = image_dir
  
      def filter_mask(self, img, a=None):
          '''
              This filters are hand-picked just based on visual tests
          '''
          kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
          # Fill any small holes
          closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
          # Remove noise
          opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
          # Dilate to merge adjacent blobs
          dilation = cv2.dilate(opening, kernel, iterations=2)
          return dilation
  
      def detect_vehicles(self, fg_mask, context):
          matches = []
          # finding external contours
          im2, contours, hierarchy = cv2.findContours(
              fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
          for (i, contour) in enumerate(contours):
              (x, y, w, h) = cv2.boundingRect(contour)
              contour_valid = (w >= self.min_contour_width) and (
                  h >= self.min_contour_height)
              if not contour_valid:
                  continue
              centroid = utils.get_centroid(x, y, w, h)
              matches.append(((x, y, w, h), centroid))
          return matches
  
      def __call__(self, context):
          frame = context['frame'].copy()
          frame_number = context['frame_number']
          fg_mask = self.bg_subtractor.apply(frame, None, 0.001)
          # just thresholding values
          fg_mask[fg_mask < 240] = 0
          fg_mask = self.filter_mask(fg_mask, frame_number)
          if self.save_image:
              utils.save_frame(fg_mask, self.image_dir +
                               "/mask_%04d.png" % frame_number, flip=False)
          context['objects'] = self.detect_vehicles(fg_mask, context)
          context['fg_mask'] = fg_mask
          return contex
           

現在,讓我們建立一個處理器,該處理器将找出不同的幀上檢測到的相同對象,建立路徑,并對到達出口區域的車輛進行計數。代碼如下所示:

'''
        Counting vehicles that entered in exit zone.

        Purpose of this class based on detected object and local cache create
        objects pathes and count that entered in exit zone defined by exit masks.

        exit_masks - list of the exit masks.
        path_size - max number of points in a path.
        max_dst - max distance between two points.
    '''

    def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0):
        super(VehicleCounter, self).__init__()

        self.exit_masks = exit_masks

        self.vehicle_count = 0
        self.path_size = path_size
        self.pathes = []
        self.max_dst = max_dst
        self.x_weight = x_weight
        self.y_weight = y_weight

    def check_exit(self, point):
        for exit_mask in self.exit_masks:
            try:
                if exit_mask[point[1]][point[0]] == 255:
                    return True
            except:
                return True
        return False

    def __call__(self, context):
        objects = context['objects']
        context['exit_masks'] = self.exit_masks
        context['pathes'] = self.pathes
        context['vehicle_count'] = self.vehicle_count
        if not objects:
            return context

        points = np.array(objects)[:, 0:2]
        points = points.tolist()

        # add new points if pathes is empty
        if not self.pathes:
            for match in points:
                self.pathes.append([match])

        else:
            # link new points with old pathes based on minimum distance between
            # points
            new_pathes = []

            for path in self.pathes:
                _min = 999999
                _match = None
                for p in points:
                    if len(path) == 1:
                        # distance from last point to current
                        d = utils.distance(p[0], path[-1][0])
                    else:
                        # based on 2 prev points predict next point and calculate
                        # distance from predicted next point to current
                        xn = 2 * path[-1][0][0] - path[-2][0][0]
                        yn = 2 * path[-1][0][1] - path[-2][0][1]
                        d = utils.distance(
                            p[0], (xn, yn),
                            x_weight=self.x_weight,
                            y_weight=self.y_weight
                        )

                    if d < _min:
                        _min = d
                        _match = p

                if _match and _min <= self.max_dst:
                    points.remove(_match)
                    path.append(_match)
                    new_pathes.append(path)

                # do not drop path if current frame has no matches
                if _match is None:
                    new_pathes.append(path)

            self.pathes = new_pathes

            # add new pathes
            if len(points):
                for p in points:
                    # do not add points that already should be counted
                    if self.check_exit(p[1]):
                        continue
                    self.pathes.append([p])

        # save only last N points in path
        for i, _ in enumerate(self.pathes):
            self.pathes[i] = self.pathes[i][self.path_size * -1:]

        # count vehicles and drop counted pathes:
        new_pathes = []
        for i, path in enumerate(self.pathes):
            d = path[-2:]

            if (
                # need at list two points to count
                len(d) >= 2 and
                # prev point not in exit zone
                not self.check_exit(d[0][1]) and
                # current point in exit zone
                self.check_exit(d[1][1]) and
                # path len is bigger then min
                self.path_size <= len(path)
            ):
                self.vehicle_count += 1
            else:
                # prevent linking with path that already in exit zone
                add = True
                for p in path:
                    if self.check_exit(p[1]):
                        add = False
                        break
                if add:
                    new_pathes.append(path)

        self.pathes = new_pathes

        context['pathes'] = self.pathes
        context['objects'] = objects
        context['vehicle_count'] = self.vehicle_count

        self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)

        return context
           

上面的代碼有點複雜,是以讓我們一個部分一個部分的介紹一下。

opencv函數findcontours_使用OpenCV實作道路車輛計數

上面的圖像中綠色的部分是出口區域。我們在這裡對車輛進行計數,隻有當車輛移動的長度超過3個點我們才進行計算。

我們使用掩碼來解決這個問題,因為它比使用矢量算法有效且簡單得多。隻需使用“二進制和”即可選出車輛區域中點。設定方式如下:

EXIT_PTS = np.array([
      [[732, 720], [732, 590], [1280, 500], [1280, 720]],
      [[0, 400], [645, 400], [645, 0], [0, 0]]
  ])
  
  base = np.zeros(SHAPE + (3,), dtype='uint8')
  exit_mask = cv2.fillPoly(base, EXIT_PTS, (255, 255, 255))[:, :, 0]
           

現在我們将檢測到的點連結起來。

對于第一幀圖像,我們将所有點均添加為新路徑。

接下來,如果len(path)== 1,我們在新檢測到的對象中找到與每條路徑最後一點距離最近的對象。

如果len(path)> 1,則使用路徑中的最後兩個點,即在同一條線上預測新點,并找到該點與目前點之間的最小距離。

具有最小距離的點将添加到目前路徑的末端并從清單中删除。如果在此之後還剩下一些點,我們會将其添加為新路徑。這個過程中我們還會限制路徑中的點數。

new_pathes = []
  for path in self.pathes:
      _min = 999999
      _match = None
      for p in points:
          if len(path) == 1:
              # distance from last point to current
              d = utils.distance(p[0], path[-1][0])
          else:
              # based on 2 prev points predict next point and calculate
              # distance from predicted next point to current
              xn = 2 * path[-1][0][0] - path[-2][0][0]
              yn = 2 * path[-1][0][1] - path[-2][0][1]
              d = utils.distance(
                  p[0], (xn, yn),
                  x_weight=self.x_weight,
                  y_weight=self.y_weight
              )
  
          if d < _min:
              _min = d
              _match = p
  
      if _match and _min <= self.max_dst:
          points.remove(_match)
          path.append(_match)
          new_pathes.append(path)
  
      # do not drop path if current frame has no matches
      if _match is None:
          new_pathes.append(path)
  
  self.pathes = new_pathes
  
  # add new pathes
  if len(points):
      for p in points:
          # do not add points that already should be counted
          if self.check_exit(p[1]):
              continue
          self.pathes.append([p])
  
  # save only last N points in path
  for i, _ in enumerate(self.pathes):
      self.pathes[i] = self.pathes[i][self.path_size * -1:]
           

現在,我們将嘗試計算進入出口區域的車輛。為此,我們需擷取路徑中的最後2個點,并檢查len(path)是否應大于限制。

# count vehicles and drop counted pathes:
    new_pathes = []
    for i, path in enumerate(self.pathes):
        d = path[-2:]
        if (
            # need at list two points to count
            len(d) >= 2 and
            # prev point not in exit zone
            not self.check_exit(d[0][1]) and
            # current point in exit zone
            self.check_exit(d[1][1]) and
            # path len is bigger then min
            self.path_size <= len(path)
        ):
            self.vehicle_count += 1
        else:
            # prevent linking with path that already in exit zone
            add = True
            for p in path:
                if self.check_exit(p[1]):
                    add = False
                    break
            if add:
                new_pathes.append(path)
    self.pathes = new_pathes
    
    context['pathes'] = self.pathes
    context['objects'] = objects
    context['vehicle_count'] = self.vehicle_count 
    self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)
    return context
           

最後兩個處理器是CSV編寫器,用于建立報告CSV檔案,以及用于調試和精美圖檔的可視化。

class CsvWriter(PipelineProcessor):
        def __init__(self, path, name, start_time=0, fps=15):
            super(CsvWriter, self).__init__()
            self.fp = open(os.path.join(path, name), 'w')
            self.writer = csv.DictWriter(self.fp, fieldnames=['time', 'vehicles'])
            self.writer.writeheader()
            self.start_time = start_time
            self.fps = fps
            self.path = path
            self.name = name
            self.prev = None
        def __call__(self, context):
            frame_number = context['frame_number']
            count = _count = context['vehicle_count']
            if self.prev:
                _count = count - self.prev
            time = ((self.start_time + int(frame_number / self.fps)) * 100
                    + int(100.0 / self.fps) * (frame_number % self.fps))
            self.writer.writerow({'time': time, 'vehicles': _count})
            self.prev = count
            return context
    class Visualizer(PipelineProcessor):
        def __init__(self, save_image=True, image_dir='images'):
            super(Visualizer, self).__init__()
            self.save_image = save_image
            self.image_dir = image_dir
        def check_exit(self, point, exit_masks=[]):
            for exit_mask in exit_masks:
                if exit_mask[point[1]][point[0]] == 255:
                    return True
            return False
        def draw_pathes(self, img, pathes):
            if not img.any():
                return
            for i, path in enumerate(pathes):
                path = np.array(path)[:, 1].tolist()
                for point in path:
                    cv2.circle(img, point, 2, CAR_COLOURS[0], -1)
                    cv2.polylines(img, [np.int32(path)], False, CAR_COLOURS[0], 1)
            return img
        def draw_boxes(self, img, pathes, exit_masks=[]):
            for (i, match) in enumerate(pathes):
                contour, centroid = match[-1][:2]
                if self.check_exit(centroid, exit_masks):
                    continue
                x, y, w, h = contour
                cv2.rectangle(img, (x, y), (x + w - 1, y + h - 1),
                              BOUNDING_BOX_COLOUR, 1)
                cv2.circle(img, centroid, 2, CENTROID_COLOUR, -1)
            return img
        def draw_ui(self, img, vehicle_count, exit_masks=[]):
            # this just add green mask with opacity to the image
            for exit_mask in exit_masks:
                _img = np.zeros(img.shape, img.dtype)
                _img[:, :] = EXIT_COLOR
                mask = cv2.bitwise_and(_img, _img, mask=exit_mask)
                cv2.addWeighted(mask, 1, img, 1, 0, img)
            # drawing top block with counts
            cv2.rectangle(img, (0, 0), (img.shape[1], 50), (0, 0, 0), cv2.FILLED)
            cv2.putText(img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)
            return img
        def __call__(self, context):
            frame = context['frame'].copy()
            frame_number = context['frame_number']
            pathes = context['pathes']
            exit_masks = context['exit_masks']
            vehicle_count = context['vehicle_count']
            frame = self.draw_ui(frame, vehicle_count, exit_masks)
            frame = self.draw_pathes(frame, pathes)
            frame = self.draw_boxes(frame, pathes, exit_masks)
            utils.save_frame(frame, self.image_dir +
                             "/processed_%04d.png" % frame_number)
            return context
           
結論

正如我們看到的那樣,它并不像許多人想象的那麼難。但是,如果小夥伴運作腳本,小夥伴會發現此解決方案并不理想,存在前景對象存在重疊的問題,并且它也沒有按類型對車輛進行分類。但是,當相機有較好位置,例如位于道路正上方時,該算法具有很好的準确性。

交流群

歡迎加入“小白學視覺”公衆号讀者群一起和同行交流,目前有SLAM、三維視覺、傳感器、自動駕駛、計算攝影、檢測、分割、識别、醫學影像、GAN、算法競賽等微信群(以後會逐漸細分),請掃描下面微信号加群,備注:”昵稱+學校/公司+研究方向“,例如:”張三 + 上海交大 + 視覺SLAM“。請按照格式備注,否則不予通過。添加成功後會根據研究方向邀請進入相關微信群。

請勿

在群内發送廣告,否則會請出群,謝謝了解~

https://u.wechat.com/MH0UMFnCLWaNZIyd9ga2W-8 (二維碼自動識别)

opencv函數findcontours_使用OpenCV實作道路車輛計數

繼續閱讀