亿迅智能制造网
工业4.0先进制造技术信息网站!
首页 | 制造技术 | 制造设备 | 工业物联网 | 工业材料 | 设备保养维修 | 工业编程 |
home  MfgRobots >> 亿迅智能制造网 >  >> Manufacturing Technology >> 制造工艺

计算机视觉作为 SmartThings 的运动传感器

这款计算机视觉驱动的传感器使用 Raspberry Pi 3 和 PiCam 检测人脸并通过 LAN 将存在数据发送至 SmartThings。

我将首先假设您有一个 Raspberry Pi 3,上面有一个工作相机和 Open CV。如果你不推荐这个教程🙂

创建自定义 SmartThings 设备处理程序

在 SmartThings IDE 中,我们为计算机视觉运动传感器创建了一个新的设备处理程序。

转到“我的设备处理程序”部分,点击右上角的“+ 创建新的设备处理程序”。

在这种情况下,我们将从代码创建它。点击第二个标签“From Code”,粘贴附加的代码“Device Handler” 并点击“创建”。

在下一页中,单击“发布”以使您可以使用该设备。

编写 SmartThings 应用

与设备处理程序类似,我们将转到“我的 SmartApps”部分,点击右上角的“+ 创建新的 SmartApps”。

我们还将从代码中创建它。点击第二个标签“From Code”,粘贴附加的代码“SmartApp” 并点击“创建”。

在下一页点击“发布”。

准备树莓派

现在我们必须添加 python 脚本,该脚本将从相机获取图像,检测面部并向 SmartThings 报告。

首先d 下载并安装imutils和twisted

如果您还没有安装 imutils 包,您需要从 GitHub 获取它或通过以下方式安装它:

pip install imutils 

对于扭曲:

sudo apt-get install python-twisted-web 

现在一切准备就绪,转到 /home/pi 并创建一个目录来存储脚本

mkdir cameracd camera 

创建脚本文件:

sudo nano ssdpcamera.py 

粘贴附加的代码 相机 Python 脚本” 并通过按“control + x”然后按“y”并输入来保存。

通过键入 python ssdpcamera.py 测试脚本,您应该看到如下内容:

发现和配对树莓派

从 SmartThings 移动应用程序中,我们应该转到右下角的 Marketplace,单击“SmartApps”选项卡,最后在“+我的应用程序”中查找“计算机视觉(连接)”

确保 Raspberry Pi 已打开且 python 脚本正在运行。

SmartApp 将开始发现过程,一旦找到,点击选择对话框并选择设备并点击“完成”。

这将在您的帐户中创建设备并开始接收更新。

自动启动

最后,如果你想在你打开树莓派时自动运行 python 脚本,你可以编辑 /etc/rc.local 并添加以下行。

(sleep 10;python /home/pi/camera/ssdpcamera.py)& 

() 使两个命令都在后台运行。

代码

#!/usr/bin/python2.7""" 用于 SmartThingsCopyright 2016 Juan Pablo Risso 的计算机视觉相机 <[email protected]>依赖项:python-twisted、cv2、pyimagesearch许可下Apache 许可证,版本 2.0(“许可证”);除非符合许可证,否则您不得使用此文件。您可以在以下位置获取许可证副本:http://www.apache.org/licenses/LICENSE-2.0除非根据适用法律的要求或书面同意,根据许可证分发的软件是按“原样”分发的,没有任何类型的明示或暗示的保证或条件。请参阅许可证以了解管理许可证下许可和限制的特定语言。 """import argparseimport loggingimport cv2import urllib2import imutilsfrom time import timefrom picamera.ar​​ray import PiRGBArrayfrom picamera import PiCamerafromtwisted.web 导入服务器,resourcefromtwisted.internet 导入reactorfromtwisted.internet.defer 导入成功来自twisted.inter net.protocol import DatagramProtocolfromtwisted.web.client import Agentfrom twisted.web.http_headers import Headersfrom twisted.web.iweb import IBodyProducerfrom twisted.web._newclient import ResponseFailedfrom zope.interface import implementsSSDP_PORT =1900SSDP_ADDR ='239.25558045000000000000000000000000 -9220-11e4-96fa-123b93f75cba'SEARCH_RESPONSE ='HTTP/1.1 200 OK\r\nCACHE-CONTROL:max-age=30\r\nEXT:\r\nLOCATION:%s\r\nSERVER:Linux, UPnP/ 1.0, Pi_Garage/1.0\r\nST:%s\r\nUSN:uuid:%s::%s'# 初始化相机并获取原始相机的引用# capturecamera =PiCamera()camera.resolution =(640 , 480)camera.framerate =32rawCapture =PiRGBArray(camera, size=(640, 480))auxcount =0#构建人脸检测器并让相机预热fd =FaceDetector("cascades/haarcascade_frontalface_default.xml")time.sleep (0.1)defdetermine_ip_for_host(host):"""确定用于与特定主机通信的本地IP地址"""test_sock =DatagramProtocol()test_sock_listener =reactor.lis tenUDP(0, test_sock) # pylint:disable=no-membertest_sock.transport.connect(host, 1900)my_ip =test_sock.transport.getHost().hosttest_sock_listener.stopListening()return my_ipclass StringProducer(object):"""写一个内存中字符串到 Twisted 请求"""implements(IBodyProducer)def __init__(self, body):self.body =bodyself.length =len(body)def startProducing(self, consumer):# pylint:disable=invalid- name"""开始为指定的消费者生产提供的字符串"""consumer.write(self.body)return success(None)def pauseProducing(self):# pylint:disable=invalid-name"""暂停生产 - 无操作"""passdef stopProducing(self):# pylint:disable=invalid-name""" 停止生产 - 无操作"""passclass SSDPServer(DatagramProtocol):"""从 SmartThings hub 接收和响应 M-SEARCH 发现请求" ""def __init__(self, interface='', status_port=0, device_target=''):self.interface =interfaceself.device_target =device_targetself.status_port =status_portself.port =reactor.listenMulticas t(SSDP_PORT, self, listenMultiple=True) # pylint:disable=no-memberself.port.joinGroup(SSDP_ADDR, interface=interface)reactor.addSystemEventTrigger('before', 'shutdown', self.stop) # pylint:disable=no-memberdef datagramReceived(self, data, (host, port)):try:header, _ =data.split(b'\r\n\r\n')[:2]except ValueError:returnlines =header.split ('\r\n')cmd =lines.pop(0).split(' ')lines =[x.replace(':', ':', 1) for x in lines]lines =[x for x在行中 if len(x)> 0]headers =[x.split(':', 1) for x in lines]headers =dict([(x[0].lower(), x[1]) for x in headers])logging.debug('SSDP command %s %s - from %s:%d with headers %s', cmd[0], cmd[1], host, port, headers)search_target =''if ' st' in headers:search_target =headers['st']if cmd[0] =='M-SEARCH' and cmd[1] =='*' and search_target in self.device_target:logging.info('Received %s %s 来自 %s:%d', cmd[0], cmd[1], search_target, host, port)url ='http://%s:%d/status' % (determine_ip_for_host(host) , self.status_port)response =SEARCH_R ESPONSE % (url, search_target, UUID, self.device_target)self.port.write(response, (host, port))else:logging.debug('Ignored SSDP command %s %s', cmd[0], cmd[ 1])def stop(self):"""离开多播组并停止监听"""self.port.leaveGroup(SSDP_ADDR, interface=self.interface)self.port.stopListening()class StatusServer(resource.Resource):"""向 SmartThings 集线器提供摄像头状态的 HTTP 服务器"""isLeaf =Truedef __init__(self, device_target, subscription_list,garage_door_status):self.device_target =device_targetself.subscription_list =subscription_listself.garage_door_status =garage_door_statusresource.Resource.__init__( self)def render_SUBSCRIBE(self, request):# pylint:disable=invalid-name"""处理来自 ST hub 的订阅请求 - hub 希望收到车库门状态更新的通知"""headers =request.getAllHeaders()logging.debug ("SUBSCRIBE:%s", headers)if 'callback' in headers:cb_url =headers['callback'][1:-1]if not cb_url in self.subscription_list:self.subs cription_list[cb_url] ={}#reactor.stop()logging.info('Added subscription %s', cb_url)self.subscription_list[cb_url]['expiration'] =time() + 24 * 3600return ""def render_GET( self, request):# pylint:disable=invalid-name"""处理来自 ST hub 的轮询请求"""if request.path =='/status':if self.garage_door_status['last_state'] =='inactive' :cmd ='status-inactive'else:cmd ='status-active'msg ='%suuid:%s::%s' % (cmd, UUID, self.device_target)logging.info("%s 轮询请求 %s - 返回 %s",request.getClientIP(),request.path,cmd)return msgelse:logging.info( "从 %s 收到了 %s 的虚假请求",request.getClientIP(),request.path)return ""class MonitorCamera(object):"""监控摄像机状态,在其状态改变时生成通知"""def __init__( self, device_target, subscription_list, camera_status):# pylint:disable=too-many-argumentsself.device_target =device_targetself.subscription_list =subscription_listself.camera_stat us =camera_statuscurrent_state ='inactive'reactor.callLater(0, self.check_garage_state, current_state, auxcount) # pylint:disable=no-memberdef check_garage_state(self, current_state, auxcount):self.current_state =current_stateself.auxcount =auxcountcamera.capture( rawCapture, format="bgr", use_video_port=True)# 获取代表 imageframe 的原始 NumPy 数组 =rawCapture.array# 调整帧大小并转换为灰度帧 =imutils.resize(frame, width =640)gray =cv2.cvtColor (frame, cv2.COLOR_BGR2GRAY)#检测图像中的人脸,然后克隆frame#这样我们就可以在它上面画了faceRects =fd.detect(gray, scaleFactor =1.1, minNeighbors =10,minSize =(30, 30))frameClone =frame.copy()if faceRects !=():auxcount =0 if current_state =='inactive':current_state ='active' logging.info('State changed from %s to %s', self.camera_status['last_state '], current_state)self.camera_status['last_state'] =current_stateself.notify_hubs()else:auxcount =auxcount + 1if auxcount ==60:current_state ='inactive' logging.info('State changed from %s to %s', self.camera_status['last_state'], current_state)self.camera_status['last_state'] =current_stateself.notify_hubs() # loop over人脸边界框并在 faceRects 中为 (fX, fY, fW, fH) 绘制它们:cv2.rectangle(frameClone, (fX, fY), (fX + fW, fY + fH), (0, 255, 0), 2 )# 在新的 GUI 窗口上显示视频源#cv2.imshow("Face", videorotate)rawCapture.truncate(0) # 安排下一个 checkreactor.callLater(0, self.check_garage_state, current_state, auxcount) # pylint:disable=no-memberdef notify_hubs(self):"""通知订阅的 SmartThings 集线器发生了状态更改"""if self.camera_status['last_state'] =='inactive':cmd ='status-inactive'else:cmd ='status-active'for subscription in self.subscription_list:if self.subscription_list[subscription]['expiration']> time():logging.info("Notifying hub %s", subscription)msg ='%suuid:%s::%s' % (cmd, UUID, self.device_target)body =StringProducer(msg)agent =Agent(reactor)req =agent.request('POST',subscription,Headers({'CONTENT-LENGTH':[len(msg)]}),body)req.addCallback(self.handle_response )req.addErrback(self.handle_error)def handle_response(self, response):# pylint:disable=no-self-use"""处理 SmartThings 集线器向 POST 返回状态代码。这实际上是出乎意料的 - 它通常会关闭没有给出响应代码的 POST/PUT 连接。"""if response.code ==202:logging.info("Status update Accepted")else:logging.error("Unexpected response code:%s", response.code )def handle_error(self, response):# pylint:disable=no-self-use"""处理产生执行通知的错误。似乎没有办法避免 ResponseFailed - SmartThings Hub 不会为 POST 或 PUT 生成正确的响应代码,如果使用了 NOTIFY,它会忽略正文。"""if isinstance(response.value, ResponseFailed):logging.debug("Response failed (expected)")else:logging.error("Unexpected response:%s", response)def main():"""从命令行处理使用的主要函数"""arg_proc =argparse .ArgumentParser(description='向 SmartThings 集线器提供相机活动/非活动状态')arg_proc.add_argument('--httpport', dest='http_port', help='HTTP 端口号', default=8080, type=int) arg_proc.add_argument('--deviceindex', dest='device_index', help='设备索引', default=1, type=int)arg_proc.add_argument('--pollingfreq', dest='polling_freq', help='轮询相机状态之间的秒数', default=5, type=int)arg_proc.add_argument('--debug', dest='debug', help='Enable debug messages', default=False, action='store_true' )options =arg_proc.parse_args()device_target ='urn:schema s-upnp-org:device:RPi_Computer_Vision:%d' % (options.device_index)log_level =logging.INFOif options.debug:log_level =logging.DEBUGlogging.basicConfig(format='%(asctime)-15s %(levelname)- 8s %(message)s', level=log_level)subscription_list ={}camera_status ={'last_state':'unknown'}logging.info('Initializing camera')# SSDP server 来处理discoverySSDPServer(status_port=options.http_port, device_target =device_target)# 处理订阅的 HTTP 站点/pollingstatus_site =server.Site(StatusServer(device_target, subscription_list, camera_status))reactor.listenTCP(options.http_port, status_site) # pylint:disable=no-memberlogging.info('初始化完成' )# 监控相机状态并发送状态 changeMonitorCamera(device_target=device_target,subscription_list=subscription_list,camera_status=camera_status) reactor.run() # pylint:disable=no-memberif __name__ =="__main__":main()Source:Computer视觉作为 SmartThings 的运动传感器

制造工艺

  1. ST:具有机器学习功能的运动传感器,可实现高精度、省电的活动跟踪
  2. 树莓派上的 HA 中的运动传感器、警报、视频录制
  3. 用于 Raspberry Pi 的 DIY 红外运动传感器系统
  4. 使用 Raspberry Pi 的运动传感器
  5. 盖革计数器 – Raspberry Pi 辐射传感器板教程
  6. 带有 PIR 运动传感器的 Raspberry Pi GPIO:最佳教程
  7. 为 18 世纪 Gristmill 构建传感器网络
  8. 将 HC-SR501 PIR 运动传感器与 Raspberry Pi 连接
  9. 构建 Raspberry Pi 机器人:初学者的最佳教程
  10. 计算机视觉的7个应用
  11. 计算机视觉
  12. 航空航天传感器薄膜