千家信息网

使用OpenCV怎么实现道路车辆计数功能

发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,使用OpenCV怎么实现道路车辆计数功能,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。代码如下所示:import osi
千家信息网最后更新 2024年12月12日使用OpenCV怎么实现道路车辆计数功能

使用OpenCV怎么实现道路车辆计数功能,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。


代码如下所示:

import osimport loggingimport logging.handlersimport random
import numpy as npimport skvideo.ioimport cv2import matplotlib.pyplot as plt
import utils# without this some strange errors happencv2.ocl.setUseOpenCL(False)random.seed(123)
# ============================================================================IMAGE_DIR = "./out"VIDEO_SOURCE = "input.mp4"SHAPE = (720, 1280) # HxW# ============================================================================
def train_bg_subtractor(inst, cap, num=500): ''' BG substractor need process some amount of frames to start giving result ''' print ('Training BG Subtractor...') i = 0 for frame in cap: inst.apply(frame, None, 0.001) i += 1 if i >= num: return cap
def main(): log = logging.getLogger("main")
# creting MOG bg subtractor with 500 frames in cache # and shadow detction bg_subtractor = cv2.createBackgroundSubtractorMOG2( history=500, detectShadows=True)
# Set up image source # You can use also CV2, for some reason it not working for me cap = skvideo.io.vreader(VIDEO_SOURCE)
# skipping 500 frames to train bg subtractor train_bg_subtractor(bg_subtractor, cap, num=500)
frame_number = -1 for frame in cap: if not frame.any(): log.error("Frame capture failed, stopping...") break
frame_number += 1 utils.save_frame(frame, "./out/frame_d.png" % frame_number) fg_mask = bg_subtractor.apply(frame, None, 0.001) utils.save_frame(frame, "./out/fg_mask_d.png" % frame_number)# ============================================================================
if __name__ == "__main__": log = utils.init_logging()
if not os.path.exists(IMAGE_DIR): log.debug("Creating image directory `%s`...", IMAGE_DIR) os.makedirs(IMAGE_DIR)
main()

处理后得到下面的前景图像

去除背景后的前景图像

我们可以看出前景图像上有一些噪音,可以通过标准滤波技术可以将其消除。

滤波

针对我们现在的情况,我们将需要以下滤波函数:Threshold、Erode、Dilate、Opening、Closing。

首先,我们使用"Closing"来移除区域中的间隙,然后使用"Opening"来移除个别独立的像素点,然后使用"Dilate"进行扩张以使对象变粗。代码如下:

def filter_mask(img):    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))    # Fill any small holes    closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)    # Remove noise    opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)    # Dilate to merge adjacent blobs    dilation = cv2.dilate(opening, kernel, iterations=2)    # threshold    th = dilation[dilation < 240] = 0    return th

处理后的前景如下:

利用轮廓进行物体检测

我们将使用cv2.findContours函数对轮廓进行检测。我们在使用的时候可以选择的参数为:

cv2.CV_RETR_EXTERNAL------仅获取外部轮廓。

cv2.CV_CHAIN_APPROX_TC89_L1------使用Teh-Chin链逼近算法(更快)

代码如下:

def get_centroid(x, y, w, h):      x1 = int(w / 2)      y1 = int(h / 2)      cx = x + x1      cy = y + y1      return (cx, cy)    def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35):      matches = []      # finding external contours      im, contours, hierarchy = cv2.findContours(          fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)      # filtering by with, height      for (i, contour) in enumerate(contours):          (x, y, w, h) = cv2.boundingRect(contour)          contour_valid = (w >= min_contour_width) and (              h >= min_contour_height)          if not contour_valid:              continue          # getting center of the bounding box          centroid = get_centroid(x, y, w, h)          matches.append(((x, y, w, h), centroid))      return matches

建立数据处理框架

我们都知道在ML和CV中,没有一个算法可以处理所有问题。即使存在这种算法,我们也不会使用它,因为它很难大规模有效。例如几年前Netflix公司用300万美元的奖金悬赏最佳电影推荐算法。有一个团队完成这个任务,但是他们的推荐算法无法大规模运行,因此其实对公司毫无用处。但是,Netflix公司仍奖励了他们100万美元。

接下来我们来建立解决当前问题的框架,这样可以使数据的处理更加方便

class PipelineRunner(object):      '''          Very simple pipline.          Just run passed processors in order with passing context from one to           another.          You can also set log level for processors.      '''      def __init__(self, pipeline=None, log_level=logging.DEBUG):          self.pipeline = pipeline or []          self.context = {}          self.log = logging.getLogger(self.__class__.__name__)          self.log.setLevel(log_level)          self.log_level = log_level          self.set_log_level()      def set_context(self, data):          self.context = data      def add(self, processor):          if not isinstance(processor, PipelineProcessor):              raise Exception(                  'Processor should be an isinstance of PipelineProcessor.')          processor.log.setLevel(self.log_level)          self.pipeline.append(processor)       def remove(self, name):          for i, p in enumerate(self.pipeline):              if p.__class__.__name__ == name:                  del self.pipeline[i]                  return True          return False        def set_log_level(self):          for p in self.pipeline:              p.log.setLevel(self.log_level)        def run(self):          for p in self.pipeline:              self.context = p(self.context)           self.log.debug("Frame #%d processed.", self.context['frame_number'])          return self.context    class PipelineProcessor(object):      '''          Base class for processors.      '''      def __init__(self):          self.log = logging.getLogger(self.__class__.__name__)

首先我们获取一张处理器运行顺序的列表,让每个处理器完成一部分工作,在案顺序完成执行以获得最终结果。

我们首先创建轮廓检测处理器。轮廓检测处理器只需将前面的背景扣除,滤波和轮廓检测部分合并在一起即可,代码如下所示:

class ContourDetection(PipelineProcessor):      '''          Detecting moving objects.          Purpose of this processor is to subtrac background, get moving objects          and detect them with a cv2.findContours method, and then filter off-by          width and height.           bg_subtractor - background subtractor isinstance.          min_contour_width - min bounding rectangle width.          min_contour_height - min bounding rectangle height.          save_image - if True will save detected objects mask to file.          image_dir - where to save images(must exist).              '''        def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'):          super(ContourDetection, self).__init__()          self.bg_subtractor = bg_subtractor          self.min_contour_width = min_contour_width          self.min_contour_height = min_contour_height          self.save_image = save_image          self.image_dir = image_dir        def filter_mask(self, img, a=None):          '''              This filters are hand-picked just based on visual tests          '''          kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))          # Fill any small holes          closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)          # Remove noise          opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)          # Dilate to merge adjacent blobs          dilation = cv2.dilate(opening, kernel, iterations=2)          return dilation        def detect_vehicles(self, fg_mask, context):          matches = []          # finding external contours          im2, contours, hierarchy = cv2.findContours(              fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)          for (i, contour) in enumerate(contours):              (x, y, w, h) = cv2.boundingRect(contour)              contour_valid = (w >= self.min_contour_width) and (                  h >= self.min_contour_height)              if not contour_valid:                  continue              centroid = utils.get_centroid(x, y, w, h)              matches.append(((x, y, w, h), centroid))          return matches        def __call__(self, context):          frame = context['frame'].copy()          frame_number = context['frame_number']          fg_mask = self.bg_subtractor.apply(frame, None, 0.001)          # just thresholding values          fg_mask[fg_mask < 240] = 0          fg_mask = self.filter_mask(fg_mask, frame_number)          if self.save_image:              utils.save_frame(fg_mask, self.image_dir +                               "/mask_d.png" % frame_number, flip=False)          context['objects'] = self.detect_vehicles(fg_mask, context)          context['fg_mask'] = fg_mask          return contex

现在,让我们创建一个处理器,该处理器将找出不同的帧上检测到的相同对象,创建路径,并对到达出口区域的车辆进行计数。代码如下所示:

    '''        Counting vehicles that entered in exit zone.
Purpose of this class based on detected object and local cache create objects pathes and count that entered in exit zone defined by exit masks.
exit_masks - list of the exit masks. path_size - max number of points in a path. max_dst - max distance between two points. '''
def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0): super(VehicleCounter, self).__init__()
self.exit_masks = exit_masks
self.vehicle_count = 0 self.path_size = path_size self.pathes = [] self.max_dst = max_dst self.x_weight = x_weight self.y_weight = y_weight
def check_exit(self, point): for exit_mask in self.exit_masks: try: if exit_mask[point[1]][point[0]] == 255: return True except: return True return False
def __call__(self, context): objects = context['objects'] context['exit_masks'] = self.exit_masks context['pathes'] = self.pathes context['vehicle_count'] = self.vehicle_count if not objects: return context
points = np.array(objects)[:, 0:2] points = points.tolist()
# add new points if pathes is empty if not self.pathes: for match in points: self.pathes.append([match])
else: # link new points with old pathes based on minimum distance between # points new_pathes = []
for path in self.pathes: _min = 999999 _match = None for p in points: if len(path) == 1: # distance from last point to current d = utils.distance(p[0], path[-1][0]) else: # based on 2 prev points predict next point and calculate # distance from predicted next point to current xn = 2 * path[-1][0][0] - path[-2][0][0] yn = 2 * path[-1][0][1] - path[-2][0][1] d = utils.distance( p[0], (xn, yn), x_weight=self.x_weight, y_weight=self.y_weight )
if d < _min: _min = d _match = p
if _match and _min <= self.max_dst: points.remove(_match) path.append(_match) new_pathes.append(path)
# do not drop path if current frame has no matches if _match is None: new_pathes.append(path)
self.pathes = new_pathes
# add new pathes if len(points): for p in points: # do not add points that already should be counted if self.check_exit(p[1]): continue self.pathes.append([p])
# save only last N points in path for i, _ in enumerate(self.pathes): self.pathes[i] = self.pathes[i][self.path_size * -1:]
# count vehicles and drop counted pathes: new_pathes = [] for i, path in enumerate(self.pathes): d = path[-2:]
if ( # need at list two points to count len(d) >= 2 and # prev point not in exit zone not self.check_exit(d[0][1]) and # current point in exit zone self.check_exit(d[1][1]) and # path len is bigger then min self.path_size <= len(path) ): self.vehicle_count += 1 else: # prevent linking with path that already in exit zone add = True for p in path: if self.check_exit(p[1]): add = False break if add: new_pathes.append(path)
self.pathes = new_pathes
context['pathes'] = self.pathes context['objects'] = objects context['vehicle_count'] = self.vehicle_count
self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)
return context

上面的代码有点复杂,因此让我们一个部分一个部分的介绍一下。

上面的图像中绿色的部分是出口区域。我们在这里对车辆进行计数,只有当车辆移动的长度超过3个点我们才进行计算

我们使用掩码来解决这个问题,因为它比使用矢量算法有效且简单得多。只需使用"二进制和"即可选出车辆区域中点。设置方式如下:

EXIT_PTS = np.array([      [[732, 720], [732, 590], [1280, 500], [1280, 720]],      [[0, 400], [645, 400], [645, 0], [0, 0]]  ])    base = np.zeros(SHAPE + (3,), dtype='uint8')  exit_mask = cv2.fillPoly(base, EXIT_PTS, (255, 255, 255))[:, :, 0]


现在我们将检测到的点链接起来。

对于第一帧图像,我们将所有点均添加为新路径。

接下来,如果len(path)== 1,我们在新检测到的对象中找到与每条路径最后一点距离最近的对象。

如果len(path)> 1,则使用路径中的最后两个点,即在同一条线上预测新点,并找到该点与当前点之间的最小距离。

具有最小距离的点将添加到当前路径的末端并从列表中删除。如果在此之后还剩下一些点,我们会将其添加为新路径。这个过程中我们还会限制路径中的点数。

new_pathes = []  for path in self.pathes:      _min = 999999      _match = None      for p in points:          if len(path) == 1:              # distance from last point to current              d = utils.distance(p[0], path[-1][0])          else:              # based on 2 prev points predict next point and calculate              # distance from predicted next point to current              xn = 2 * path[-1][0][0] - path[-2][0][0]              yn = 2 * path[-1][0][1] - path[-2][0][1]              d = utils.distance(                  p[0], (xn, yn),                  x_weight=self.x_weight,                  y_weight=self.y_weight              )            if d < _min:              _min = d              _match = p        if _match and _min <= self.max_dst:          points.remove(_match)          path.append(_match)          new_pathes.append(path)        # do not drop path if current frame has no matches      if _match is None:          new_pathes.append(path)    self.pathes = new_pathes    # add new pathes  if len(points):      for p in points:          # do not add points that already should be counted          if self.check_exit(p[1]):              continue          self.pathes.append([p])    # save only last N points in path  for i, _ in enumerate(self.pathes):      self.pathes[i] = self.pathes[i][self.path_size * -1:]

现在,我们将尝试计算进入出口区域的车辆。为此,我们需获取路径中的最后2个点,并检查len(path)是否应大于限制。

# count vehicles and drop counted pathes:    new_pathes = []    for i, path in enumerate(self.pathes):        d = path[-2:]        if (            # need at list two points to count            len(d) >= 2 and            # prev point not in exit zone            not self.check_exit(d[0][1]) and            # current point in exit zone            self.check_exit(d[1][1]) and            # path len is bigger then min            self.path_size <= len(path)        ):            self.vehicle_count += 1        else:            # prevent linking with path that already in exit zone            add = True            for p in path:                if self.check_exit(p[1]):                    add = False                    break            if add:                new_pathes.append(path)    self.pathes = new_pathes        context['pathes'] = self.pathes    context['objects'] = objects    context['vehicle_count'] = self.vehicle_count     self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)    return context

最后两个处理器是CSV编写器,用于创建报告CSV文件,以及用于调试和精美图片的可视化。

class CsvWriter(PipelineProcessor):        def __init__(self, path, name, start_time=0, fps=15):            super(CsvWriter, self).__init__()            self.fp = open(os.path.join(path, name), 'w')            self.writer = csv.DictWriter(self.fp, fieldnames=['time', 'vehicles'])            self.writer.writeheader()            self.start_time = start_time            self.fps = fps            self.path = path            self.name = name            self.prev = None        def __call__(self, context):            frame_number = context['frame_number']            count = _count = context['vehicle_count']            if self.prev:                _count = count - self.prev            time = ((self.start_time + int(frame_number / self.fps)) * 100                    + int(100.0 / self.fps) * (frame_number % self.fps))            self.writer.writerow({'time': time, 'vehicles': _count})            self.prev = count            return context    class Visualizer(PipelineProcessor):        def __init__(self, save_image=True, image_dir='images'):            super(Visualizer, self).__init__()            self.save_image = save_image            self.image_dir = image_dir        def check_exit(self, point, exit_masks=[]):            for exit_mask in exit_masks:                if exit_mask[point[1]][point[0]] == 255:                    return True            return False        def draw_pathes(self, img, pathes):            if not img.any():                return            for i, path in enumerate(pathes):                path = np.array(path)[:, 1].tolist()                for point in path:                    cv2.circle(img, point, 2, CAR_COLOURS[0], -1)                    cv2.polylines(img, [np.int32(path)], False, CAR_COLOURS[0], 1)            return img        def draw_boxes(self, img, pathes, exit_masks=[]):            for (i, match) in enumerate(pathes):                contour, centroid = match[-1][:2]                if self.check_exit(centroid, exit_masks):                    continue                x, y, w, h = contour                cv2.rectangle(img, (x, y), (x + w - 1, y + h - 1),                              BOUNDING_BOX_COLOUR, 1)                cv2.circle(img, centroid, 2, CENTROID_COLOUR, -1)            return img        def draw_ui(self, img, vehicle_count, exit_masks=[]):            # this just add green mask with opacity to the image            for exit_mask in exit_masks:                _img = np.zeros(img.shape, img.dtype)                _img[:, :] = EXIT_COLOR                mask = cv2.bitwise_and(_img, _img, mask=exit_mask)                cv2.addWeighted(mask, 1, img, 1, 0, img)            # drawing top block with counts            cv2.rectangle(img, (0, 0), (img.shape[1], 50), (0, 0, 0), cv2.FILLED)            cv2.putText(img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30),                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)            return img        def __call__(self, context):            frame = context['frame'].copy()            frame_number = context['frame_number']            pathes = context['pathes']            exit_masks = context['exit_masks']            vehicle_count = context['vehicle_count']            frame = self.draw_ui(frame, vehicle_count, exit_masks)            frame = self.draw_pathes(frame, pathes)            frame = self.draw_boxes(frame, pathes, exit_masks)            utils.save_frame(frame, self.image_dir +                             "/processed_d.png" % frame_number)            return context

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

处理 路径 检测 处理器 车辆 代码 算法 轮廓 区域 图像 前景 对象 部分 公司 问题 面的 出口 最小 有效 接下来 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 盗贼之海怎么加入同一个服务器 海南软件开发招聘信息 备用dns服务器保存不了 中国网络安全领域细分矩阵图 电力网络安全建设拓扑结构图 福建网络技术咨询信息推荐 网络安全风险隐患的分析报告 服务器黑洞 制定了单位网络安全预案 中国网络安全法律 山东省公安厅举行网络安全比赛 正规网络技术供应 用安卓设计数据库 数据库系统概率题库 宁夏新华互联网科技学校 夸父逐日朗读软件开发 手机微信网络安全宣传周海报 昆山机械软件开发科技有限公司 西安手机应用软件开发公司哪个好 给数据库中哪些字段加索引 东莞市维翰网络技术有限公司 宽带网络安全知识 手机软件开发周期是多长 物理服务器在创建D盘 华为8在激活手机无法连接服务器 汽车驾驶人网络安全教育 论网络安全面对的挑战 第七届网络安全日答题答案 isa服务器是什么 转正4K起软件开发学徒
0