加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
jieyinUI.py 7.84 KB
一键复制 编辑 原始数据 按行查看 历史
422576903@qq.com 提交于 2021-06-02 18:28 . 修改二维码及打印判断
import configparser
import io
import json
import threading
import time
import subprocess
import qrcode
from PyQt5 import uic
from PyQt5.QtCore import QObject, pyqtSignal
from PyQt5.QtGui import QMovie
from PyQt5.QtWidgets import QApplication
from paho.mqtt import publish
from qtpy import QtGui
from paho.mqtt import client as mqtt_client
cf = configparser.ConfigParser()
cf.read("./config/config.ini", encoding='utf-8')
mqtt_host = cf.get("JIEYIN", "mqtt_host")
mqtt_port = cf.get("JIEYIN", "mqtt_port")
http_host = cf.get("JIEYIN", "http_host")
print_name = cf.get("JIEYIN", "print_name")
print_addr = cf.get("JIEYIN", "print_addr")
color_name = cf.get("JIEYIN", "color_name")
black_name = cf.get("JIEYIN", "black_name")
server_topic = cf.get("JIEYIN", "server_topic")
bootdir = cf.get("JIEYIN", "bootdir")
printfile = cf.get("JIEYIN", "printfile")
res_dict = {'status': 0, "name": print_name, 'data': {'sign': 0, 'msg': "Interval"}}
count = 0
class MyMQTT(QObject):
signal = pyqtSignal(str, str)
def __init__(self, parent=None):
super().__init__(parent)
def _connect_mqtt(self) -> mqtt_client:
def connect_mqtt(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
m_client = mqtt_client.Client(print_name)
m_client.username_pw_set(username="jieyin", password="1357924680")
m_client.on_connect = connect_mqtt
m_client.connect(mqtt_host, int(mqtt_port))
return m_client
def _subscribe(self, m_client: mqtt_client):
def on_message(m_client, data, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
thread = threading.Thread(target=self.printing, kwargs={"m_self": self, "m_client": m_client, "msg": msg.payload.decode('utf-8')})
thread.start()
self.signal.emit(msg.topic, msg.payload.decode('utf-8'))
m_client.subscribe([(print_name, 1)])
m_client.on_message = on_message
def mqtt_run(self):
m_client = self._connect_mqtt()
self._subscribe(m_client)
m_client.loop_start()
def printing(self, **kargcs):
global status
m_client = kargcs['m_client']
obj = json.loads(kargcs['msg'])
state = obj['state'] # 文件docID
if state != '7':
print("state: %s" % state)
return
docId = obj['docId'] # 文件docID
page = obj['page'].split(",") # 打印页面列表,如:1-5,7-9,11
color = obj['color'] # 0-黑白,1彩色
copies = obj['copies'] # 打印份数
side = obj['side'] # 0-单面打印,1-双面打印,2-双面短边
landscape = obj['landscape'] # 0-纵向,1-横向
media = obj['media'] # 0-A4,1-letter,2-legal
cmd = "wget " + http_host + str(docId) + " -O " + bootdir + printfile
pages = len(page)
if pages > 1:
print_com = "lp -n" + str(copies) + " -P " + page[0] + "-" + page[pages-1] + " "
else:
print_com = "lp -n" + str(copies) + " -P " + page[0] + " "
if color == 1:
print_com = print_com + " -d " + color_name + " "
if side == 1:
print_com = print_com + " -o sides=two-sided-long-edge "
if side == 2:
print_com = print_com + " -o sides=two-sided-short-edge "
if landscape == 1:
print_com = print_com + " -o landscape "
if media == 1: # letter
print_com = print_com + " -o media=letter "
if media == 2: # legal
print_com = print_com + " -o media=legal "
print_com += (bootdir + printfile)
obj['status'] = 7
obj['state'] = "loadfile"
m_client.publish(server_topic, str(obj), 1)
time.sleep(1)
(status, output) = subprocess.getstatusoutput(cmd)
print("file_output: %s" % output)
obj['status'] = 8
if status > 0:
obj['state'] = "loadfail"
m_client.publish(server_topic, str(obj), 1)
self.signal.emit(None, json.dumps(obj))
else:
obj['state'] = "startprint"
m_client.publish(server_topic, str(obj), 1)
self.signal.emit(None, json.dumps(obj))
(status, output) = subprocess.getstatusoutput(print_com)
print("print_output: %s" % output)
if status == 0:
(status, output) = subprocess.getstatusoutput("lpq -a")
while output != "无条目":
time.sleep(2)
(status, output) = subprocess.getstatusoutput("lpq -a")
print("state_output: %s" % output)
obj['status'] = 9
obj['state'] = "endprint"
m_client.publish(server_topic, str(obj), 1)
jsonStr = json.dumps(obj)
self.signal.emit(None, jsonStr)
else:
obj['status'] = 9
obj['state'] = "errprint"
m_client.publish(server_topic, str(obj), 1)
jsonStr = json.dumps(obj)
self.signal.emit(None, jsonStr)
class jieyinUI:
def __init__(self):
self.ui = uic.loadUi(bootdir + "jieyin.ui")
self.ui.setWindowOpacity(0.85) # 设置窗口透明度
# self.setAttribute(QtCore.Qt.WA_TranslucentBackground) # 设置窗口背景透明
# self.ui.setWindowFlag(QtCore.Qt.FramelessWindowHint)
self.ui.address.setText(print_addr)
def load_gifimg(self, gif= bootdir + "views/bg.gif"):
movie = QMovie(gif)
self.ui.gif_label.setMovie(movie)
movie.start()
self.ui.gif_label.setScaledContents(True)
return movie
def genera_qrcode(self, qr_text="https://www.baidu.com"):
qr = qrcode.QRCode(version=1, box_size=5, border=0)
qr.add_data(qr_text)
self.qr_img = qr.make_image()
fp = io.BytesIO()
self.qr_img.save(fp, "BMP")
image = QtGui.QImage()
image.loadFromData(fp.getvalue(), "BMP")
qr_pixmap = QtGui.QPixmap.fromImage(image)
self.ui.qrcode.setPixmap(qr_pixmap)
return qr_pixmap
def show_data(self, topic, payload):
obj = json.loads(payload)
self.ui.nickName.setText(obj['nickName'])
self.ui.fileName.setText(obj['fileName'])
self.ui.price.setText(str(obj['price']) + ' 元')
if obj['state'] == "loadfile":
self.ui.state.setText("正在下载文件")
elif obj['state'] == "loadfail":
self.ui.state.setText("下载文件失败")
elif obj['state'] == "startprint":
self.ui.state.setText("开始打印文档")
elif obj['state'] == "errprint":
self.ui.state.setText("打印机错误")
else:
self.ui.state.setText("打印机空闲")
self.ui.nickName.setText("")
self.ui.fileName.setText("")
self.ui.price.setText("00 元")
class RepeatingTimer(threading.Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
def setInterval(text):
print("setInterval: " + text)
publish.single(server_topic, text, qos=1, hostname=mqtt_host, port=int(mqtt_port), client_id=print_name + "-1",
auth={'username': "jieyin", 'password': "1357924680"})
def main():
app = QApplication([])
stats = jieyinUI()
stats.load_gifimg()
stats.genera_qrcode(print_name)
stats.ui.show()
# stats.ui.showFullScreen()
m_client = MyMQTT()
a = 1
while(a):
try:
time.sleep(2)
m_client.mqtt_run()
m_client.signal.connect(stats.show_data)
a = 0
except:
print("重连!")
t = RepeatingTimer(120, setInterval, args=[str(res_dict)])
t.start()
app.exec_()
if __name__ == '__main__':
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化