加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
rtsp_camera.py 5.73 KB
一键复制 编辑 原始数据 按行查看 历史
张涵 提交于 2023-08-04 16:07 . 简化代码
from PyQt5.QtCore import Qt
import cv2, sys, os, threading, time
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtGui import QImage, QPixmap
import datetime
from form import Ui_Form
import threading
import socket
import time
lock = threading.Lock()
class DeviceScan:
routers = []
def __init__(self, start_ip, end_ip):
self.start_ip = start_ip
self.end_ip = end_ip
start_index = self.get_last_ip(start_ip)
end_index = self.get_last_ip(end_ip)
if start_index is not None and end_index is not None:
self.scan(int(start_index), int(end_index))
def get_last_ip(self, ip):
if ip is not None and len(ip) > 0:
ips = ip.split(".")
if len(ips) == 4:
return ips[3]
else:
return None
else:
return None
def check_ip(self, new_ip):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # create socket
s.settimeout(1)
PORT = 554 # change it to the port you want
result = s.connect_ex((new_ip, PORT))
if result == 0:
lock.acquire() # get lock
self.routers.append(new_ip)
lock.release()
def scan(self, start_index, end_index):
local_ip = socket.gethostbyname_ex(socket.gethostname())
all_threads = []
for ip in local_ip[2]:
for i in range(start_index, end_index):
array = ip.split(".") # split ip for dot
array[3] = str(i)
new_ip = '.'.join(array)
t = threading.Thread(target=self.check_ip, args=(new_ip,))
t.start()
all_threads.append(t)
for t in all_threads:
t.join()
def get_ips(self):
return self.routers
class TestCamera(QtWidgets.QWidget, Ui_Form):
def __init__(self):
super(TestCamera, self).__init__()
self.setupUi(self)
self.init_camera()
self.Camera1.setPixmap(
QPixmap("D:/Desktop/comprehensive-dispatching\pc-software-algorithm-master\source\second\loading.png"))
self.Camera1.setScaledContents(True) # 自适应窗口
self.Camera2.setPixmap(
QPixmap("D:/Desktop/comprehensive-dispatching\pc-software-algorithm-master\source\second\loading.png"))
self.Camera2.setScaledContents(True) # 自适应窗口
self.Camera3.setPixmap(
QPixmap("D:/Desktop/comprehensive-dispatching\pc-software-algorithm-master\source\second\loading.png"))
self.Camera3.setScaledContents(True) # 自适应窗口
self.Camera4.setPixmap(
QPixmap("D:/Desktop/comprehensive-dispatching\pc-software-algorithm-master\source\second\loading.png"))
self.Camera4.setScaledContents(True) # 自适应窗口
self.search_camera.clicked.connect(self.camera)
self.listWidget.itemClicked.connect(self.run_pphuman)
def init_camera(self):
"""初始化摄像头监控"""
print("进程pid:", os.getpid())
def run_pphuman(self):
index = self.listWidget.currentRow()
row = self.listWidget.item(index)
# 直接获取点击行的文本作为设备ip(相当于之前的self.result[index])
device_ip = row.text()
print("device_ip", device_ip)
url = "rtsp://172.25.148.7/10001"
threading.Thread(target=self.run_mon,
args=("infer_cfg_pphuman.yml", "rtsp://admin:admin@" + device_ip + ":554/live", url)).start()
time.sleep(40)
print("start push")
url_push = "rtsp://172.25.148.7/10001\live"
threading.Thread(target=Camera(url_push, self.Camera1).display).start()
def run_mon(self, yml, camera, url):
'''
向信号trigger发送消息
'''
starttime = datetime.datetime.now()
print("Thread is Running")
os.system((
"python D:/Desktop/comprehensive-dispatching/pc-software-algorithm-master/deploy/pipeline"
"/pipeline.py --config "
"D:/Desktop/comprehensive-dispatching/pc-software-algorithm-master/deploy/pipeline/config/%s "
"--rtsp %s --device=gpu --pushurl %s") % (yml, camera, url))
print("运行完毕")
endtime = datetime.datetime.now()
starttime_count = starttime.hour * 3600 + starttime.minute * 60 + starttime.second
endtime_count = endtime.hour * 3600 + endtime.minute * 60 + endtime.second
self.final_time = str(endtime_count - starttime_count)
def camera(self):
self.result = DeviceScan('192.168.0.0', '192.168.0.255').get_ips()
for i in range(len((self.result))):
print(self.result[i])
self.listWidget.addItem(self.result[1])
class Camera:
"""摄像头对象"""
def __init__(self, url, out_label):
"""初始化方法"""
self.url = url
self.outLabel = out_label
def display(self):
"""显示"""
print(self.url)
cap = cv2.VideoCapture(self.url)
start_time = time.time()
while cap.isOpened():
success, frame = cap.read()
print(success)
if success:
if (time.time() - start_time) > 0.1:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
img = QImage(frame.data, frame.shape[1], frame.shape[0], QImage.Format_RGB888)
self.outLabel.setPixmap(QPixmap.fromImage(img))
self.outLabel.setScaledContents(True) # 自适应窗口
cv2.waitKey(1)
start_time = time.time()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
mainWindow = TestCamera()
mainWindow.show()
sys.exit(app.exec())
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化