加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
service.py 11.80 KB
一键复制 编辑 原始数据 按行查看 历史
春秋 提交于 2016-05-17 18:33 . add *
#!/usr/bin/env python3
# _._ coding:utf-8 _._
import os
import sys
import time
import shutil
import traceback
import configparser
class MonitorRun:
@classmethod
def monitor_run_time(cls, function):
"""
监控运行时间
:param function:
:return:
"""
def wrapper(*args, **kwargs):
clock = time.clock()
print('start run. clock time is :{0}'.format(clock))
function(*args, **kwargs)
clock_end = time.clock()
print('end run. clock time is :{0}'.format(clock_end))
return wrapper
class Service:
cp = None
path = os.path.split(os.path.realpath(__file__))[0]
conf_path = os.path.join(path, 'conf')
data_path = os.path.join(path, 'data')
commands = {}
# system
stdin = '/dev/null'
stdout = 'dev/null'
stderr = 'dev/null'
# user
user = 'root'
group = 'root'
# dirs
source_dir = None
target_dir = None
# filter_list
filter_dirs = []
si = None
so = None
se = None
def __init__(self):
self.commands = {'reload': self.reload, 'stop': self.stop, 'restart': self.restart}
self.cp = configparser.ConfigParser()
self.cp.read(os.path.join(self.conf_path, 'monitor.conf'), 'utf-8')
try:
self._valid_config()
except OSError as e:
print("valid config fail. error: {0}".format(e.strerror))
sys.exit(1)
if not os.path.exists(self.source_dir) or not os.path.isdir(self.source_dir):
raise NotADirectoryError(self.source_dir + '不是一个有效的目录')
if not os.path.exists(self.target_dir) and not os.path.isdir(self.target_dir):
os.makedirs(self.target_dir)
def _valid_config(self):
"""
检查配置文件
:return:
"""
if not self.cp.has_section('system'):
raise configparser.NoSectionError("缺少系统配置模块")
if not self.cp.has_section('user'):
raise configparser.NoSectionError("缺少权限配置模块")
if not self.cp.has_section('dirs'):
raise configparser.NoSectionError("没有配置监控目录与目标目录")
# dirs
self.source_dir = self.cp.get('dirs', 'source_dir')
self.target_dir = self.cp.get('dirs', 'target_dir')
# system
if self.cp.has_option('system', 'std_in'):
self.stdin = self.cp.get('system', 'std_in')
if self.cp.has_option('system', 'std_out'):
self.stdout = self.cp.get('system', 'std_out')
if self.cp.has_option('system', 'std_err'):
self.stderr = self.cp.get('system', 'std_err')
# user
if self.cp.has_option('user', 'user'):
self.user = self.cp.get('user', 'user')
if self.cp.has_option('user', 'group'):
self.group = self.cp.get('user', 'group')
# filters
if self.cp.has_section('filter') and self.cp.has_option('filter', 'filter_list'):
with open(os.path.join(self.conf_path, self.cp.get('filter', 'filter_list'))) as f:
for line in f:
self.filter_dirs.append(line.strip("\n"))
def start(self) -> None:
"""
创建守护进程,主进程退出,开始监控
:return:
"""
if self.valid_service() != -1:
return -1
print("monitor dir: {0}".format(self.source_dir))
print("start service...")
try:
pid = os.fork()
if pid > 0:
print("start service success")
sys.exit(0)
except OSError as e:
print("start service failed. errorInfo: {0} {1}".format(e.errno, e.strerror))
sys.exit(1)
os.chdir('/')
os.setsid()
os.umask(0)
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError as e:
sys.stderr.write("start service failed. errorInfo: {0} {1}".format(e.errno, e.strerror))
sys.exit(1)
sys.stdout.flush()
sys.stderr.flush()
self.si = open(self.stdin, 'r')
self.so = open(self.stdout, 'a+')
self.se = open(self.stderr, 'a+')
os.dup2(self.si.fileno(), sys.stdin.fileno())
os.dup2(self.so.fileno(), sys.stdout.fileno())
os.dup2(self.se.fileno(), sys.stderr.fileno())
with open(os.path.join(self.data_path, '.pid'), 'w') as f:
f.write(str(os.getpid()))
with open(os.path.join(self.data_path, '.status'), 'w') as f:
f.write('1')
open(os.path.join(self.data_path, '.command'), 'w').close()
self._monitor()
def reload(self):
"""
重新加载配置文件
:return:
"""
if self.valid_service() == -1:
return -1
try:
self._valid_config()
except OSError:
traceback.print_exc(file=self.se)
return -1
return 1
def stop(self):
"""
结束服务进程
:return:
"""
if self.valid_service() == -1:
return -1
with open(os.path.join(self.data_path, '.status'), 'w') as f:
f.write('1')
open(os.path.join(self.data_path, '.command'), 'w').close()
open(os.path.join(self.data_path, '.pid'), 'w').close()
sys.exit(0)
def restart(self):
pass
def error(self):
pass
def valid_service(self):
"""
检查服务是否在运行
:return:
"""
pid_path = os.path.join(self.data_path, '.pid')
try:
if os.path.exists(pid_path):
with open(pid_path, 'r') as f:
if f.readline() == str(os.getpid()):
return os.getpid()
else:
return -1
else:
return -1
except OSError:
traceback.print_exc(file=self.se)
return -1
def _monitor(self):
"""
开启监听
:return:
"""
while True:
self._flush_log()
self._command_line()
self._contrast(self.source_dir)
time.sleep(1)
def _flush_log(self):
"""
刷新日志文件
:return:
"""
if time.localtime(time.time()).tm_min == 1 and self.stdout != '/dev/null':
self.so.close()
os.remove(self.stdout)
self.so = open(self.stdout, 'a+')
os.dup2(self.so.fileno(), sys.stdout.fileno())
def _command_line(self):
"""
执行外部命令
:return:
"""
command_path = os.path.join(self.data_path, '.command')
status_path = os.path.join(self.data_path, '.status')
try:
if os.path.exists(command_path):
with open(command_path, 'r') as f:
command = f.readline()
status = self.commands.get(command, self.error)()
with open(status_path, 'w') as sf:
sf.write(str(status))
open(command_path, 'w').close()
else:
open(command_path, 'w').close()
except OSError:
with open(status_path, 'w') as sf:
sf.write('-1')
traceback.print_exc(file=self.se)
def _contrast(self, source_dir: str) -> None:
"""
递归遍历目录,检查每个文件
:param source_dir:
:return:
"""
# 检查源目录,更新文件及目录
try:
for source_path, dirs, files in os.walk(source_dir):
target_path = self._get_target(source_path)
# 跳过filter_dirs中的目录
filter_path_list = [os.path.join(source_path, filter_dir) for filter_dir in dirs]
for filter_path in filter_path_list:
if filter_path in self.filter_dirs:
dirs[:] = [dir_path for dir_path in dirs if dir_path != os.path.split(filter_path)[-1]]
filter_target_path = self._get_target(filter_path)
if not os.path.exists(filter_target_path):
os.makedirs(filter_target_path)
shutil.chown(filter_target_path, self.user, self.group)
print("create dir: {0}".format(filter_target_path))
continue
# 对比目录差异
if os.path.exists(target_path):
list_target_path = os.listdir(target_path)
list_diff_path = list(set(dirs + files) ^ set(list_target_path))
for path in list_diff_path:
target_path = os.path.join(target_path, path)
if os.path.exists(target_path) and self._get_source(target_path) not in self.filter_dirs:
shutil.rmtree(target_path) if os.path.isdir(target_path) else os.remove(target_path)
print("remove path: {0}".format(target_path))
# 同步文件
for file in files:
source = os.path.join(source_path, file)
target = self._get_target(source)
if self._contrast_file(source, target):
self._copy_file(source, target)
except OSError:
traceback.print_exc(file=self.se)
def _get_target(self, source: str):
"""
获取目标路径
:param source:
:return:
"""
return source.replace(self.source_dir, self.target_dir)
def _get_source(self, target: str):
"""
获取源路径
:param target:
:return:
"""
return target.replace(self.target_dir, self.source_dir)
def _contrast_file(self, source: str, target: str) -> bool:
"""
对比文件的mtime属性
:param source:
:param target:
:return:
"""
if not os.path.exists(target):
return True
source = os.stat(source)
target = os.stat(target)
if int(source.st_mtime) > int(target.st_mtime):
return True
else:
return False
def _copy_file(self, source: str, target: str, user: str = None, group: str = None) -> None:
"""
拷贝文件并更改权限
:param source:
:param target:
:param user:
:param group:
:return:
"""
if not user:
user = self.user
if not group:
group = self.group
try:
target_path = os.path.dirname(target)
if not os.path.exists(target_path):
os.makedirs(target_path)
shutil.chown(target_path, user, group)
print("create dirs: {0}".format(target_path))
shutil.copy(source, target)
st = os.stat(source)
os.utime(target, (int(st.st_atime), int(st.st_mtime)))
shutil.chown(target, user, group)
print("copy file {0} to {1}".format(source, target))
except FileNotFoundError:
traceback.print_exc(file=self.se)
def _remove_file(self, source: str, target: str) -> None:
"""
移除文件并重新拷贝
:param source:
:param target:
:return:
"""
os.remove(target)
self._copy_file(source, target)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化