代码拉取完成,页面将自动刷新
import configparser
import io
import json
import threading
import time
import subprocess
import qrcode
from PyQt5 import uic
from PyQt5.QtCore import QObject, pyqtSignal
from PyQt5.QtGui import QMovie
from PyQt5.QtWidgets import QApplication
from paho.mqtt import publish
from qtpy import QtGui
from paho.mqtt import client as mqtt_client
cf = configparser.ConfigParser()
cf.read("./config/config.ini", encoding='utf-8')
mqtt_host = cf.get("JIEYIN", "mqtt_host")
mqtt_port = cf.get("JIEYIN", "mqtt_port")
http_host = cf.get("JIEYIN", "http_host")
print_name = cf.get("JIEYIN", "print_name")
print_addr = cf.get("JIEYIN", "print_addr")
color_name = cf.get("JIEYIN", "color_name")
black_name = cf.get("JIEYIN", "black_name")
server_topic = cf.get("JIEYIN", "server_topic")
bootdir = cf.get("JIEYIN", "bootdir")
printfile = cf.get("JIEYIN", "printfile")
res_dict = {'status': 0, "name": print_name, 'data': {'sign': 0, 'msg': "Interval"}}
count = 0
class MyMQTT(QObject):
signal = pyqtSignal(str, str)
def __init__(self, parent=None):
super().__init__(parent)
def _connect_mqtt(self) -> mqtt_client:
def connect_mqtt(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
m_client = mqtt_client.Client(print_name)
m_client.username_pw_set(username="jieyin", password="1357924680")
m_client.on_connect = connect_mqtt
m_client.connect(mqtt_host, int(mqtt_port))
return m_client
def _subscribe(self, m_client: mqtt_client):
def on_message(m_client, data, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
thread = threading.Thread(target=self.printing, kwargs={"m_self": self, "m_client": m_client, "msg": msg.payload.decode('utf-8')})
thread.start()
self.signal.emit(msg.topic, msg.payload.decode('utf-8'))
m_client.subscribe([(print_name, 1)])
m_client.on_message = on_message
def mqtt_run(self):
m_client = self._connect_mqtt()
self._subscribe(m_client)
m_client.loop_start()
def printing(self, **kargcs):
global status
m_client = kargcs['m_client']
obj = json.loads(kargcs['msg'])
state = obj['state'] # 文件docID
if state != '7':
print("state: %s" % state)
return
docId = obj['docId'] # 文件docID
page = obj['page'].split(",") # 打印页面列表,如:1-5,7-9,11
color = obj['color'] # 0-黑白,1彩色
copies = obj['copies'] # 打印份数
side = obj['side'] # 0-单面打印,1-双面打印,2-双面短边
landscape = obj['landscape'] # 0-纵向,1-横向
media = obj['media'] # 0-A4,1-letter,2-legal
cmd = "wget " + http_host + str(docId) + " -O " + bootdir + printfile
pages = len(page)
if pages > 1:
print_com = "lp -n" + str(copies) + " -P " + page[0] + "-" + page[pages-1] + " "
else:
print_com = "lp -n" + str(copies) + " -P " + page[0] + " "
if color == 1:
print_com = print_com + " -d " + color_name + " "
if side == 1:
print_com = print_com + " -o sides=two-sided-long-edge "
if side == 2:
print_com = print_com + " -o sides=two-sided-short-edge "
if landscape == 1:
print_com = print_com + " -o landscape "
if media == 1: # letter
print_com = print_com + " -o media=letter "
if media == 2: # legal
print_com = print_com + " -o media=legal "
print_com += (bootdir + printfile)
obj['status'] = 7
obj['state'] = "loadfile"
m_client.publish(server_topic, str(obj), 1)
time.sleep(1)
(status, output) = subprocess.getstatusoutput(cmd)
print("file_output: %s" % output)
obj['status'] = 8
if status > 0:
obj['state'] = "loadfail"
m_client.publish(server_topic, str(obj), 1)
self.signal.emit(None, json.dumps(obj))
else:
obj['state'] = "startprint"
m_client.publish(server_topic, str(obj), 1)
self.signal.emit(None, json.dumps(obj))
(status, output) = subprocess.getstatusoutput(print_com)
print("print_output: %s" % output)
if status == 0:
(status, output) = subprocess.getstatusoutput("lpq -a")
while output != "无条目":
time.sleep(2)
(status, output) = subprocess.getstatusoutput("lpq -a")
print("state_output: %s" % output)
obj['status'] = 9
obj['state'] = "endprint"
m_client.publish(server_topic, str(obj), 1)
jsonStr = json.dumps(obj)
self.signal.emit(None, jsonStr)
else:
obj['status'] = 9
obj['state'] = "errprint"
m_client.publish(server_topic, str(obj), 1)
jsonStr = json.dumps(obj)
self.signal.emit(None, jsonStr)
class jieyinUI:
def __init__(self):
self.ui = uic.loadUi(bootdir + "jieyin.ui")
self.ui.setWindowOpacity(0.85) # 设置窗口透明度
# self.setAttribute(QtCore.Qt.WA_TranslucentBackground) # 设置窗口背景透明
# self.ui.setWindowFlag(QtCore.Qt.FramelessWindowHint)
self.ui.address.setText(print_addr)
def load_gifimg(self, gif= bootdir + "views/bg.gif"):
movie = QMovie(gif)
self.ui.gif_label.setMovie(movie)
movie.start()
self.ui.gif_label.setScaledContents(True)
return movie
def genera_qrcode(self, qr_text="https://www.baidu.com"):
qr = qrcode.QRCode(version=1, box_size=5, border=0)
qr.add_data(qr_text)
self.qr_img = qr.make_image()
fp = io.BytesIO()
self.qr_img.save(fp, "BMP")
image = QtGui.QImage()
image.loadFromData(fp.getvalue(), "BMP")
qr_pixmap = QtGui.QPixmap.fromImage(image)
self.ui.qrcode.setPixmap(qr_pixmap)
return qr_pixmap
def show_data(self, topic, payload):
obj = json.loads(payload)
self.ui.nickName.setText(obj['nickName'])
self.ui.fileName.setText(obj['fileName'])
self.ui.price.setText(str(obj['price']) + ' 元')
if obj['state'] == "loadfile":
self.ui.state.setText("正在下载文件")
elif obj['state'] == "loadfail":
self.ui.state.setText("下载文件失败")
elif obj['state'] == "startprint":
self.ui.state.setText("开始打印文档")
elif obj['state'] == "errprint":
self.ui.state.setText("打印机错误")
else:
self.ui.state.setText("打印机空闲")
self.ui.nickName.setText("")
self.ui.fileName.setText("")
self.ui.price.setText("00 元")
class RepeatingTimer(threading.Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
def setInterval(text):
print("setInterval: " + text)
publish.single(server_topic, text, qos=1, hostname=mqtt_host, port=int(mqtt_port), client_id=print_name + "-1",
auth={'username': "jieyin", 'password': "1357924680"})
def main():
app = QApplication([])
stats = jieyinUI()
stats.load_gifimg()
stats.genera_qrcode(print_name)
stats.ui.show()
# stats.ui.showFullScreen()
m_client = MyMQTT()
a = 1
while(a):
try:
time.sleep(2)
m_client.mqtt_run()
m_client.signal.connect(stats.show_data)
a = 0
except:
print("重连!")
t = RepeatingTimer(120, setInterval, args=[str(res_dict)])
t.start()
app.exec_()
if __name__ == '__main__':
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。