加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
easyquotation-multiprocessor.py 3.05 KB
一键复制 编辑 原始数据 按行查看 历史
王哲 提交于 2024-07-24 18:02 . 修改后可用
#!/usr/bin/env python3
# encoding: utf-8
"""
@author: WillSo
@license: Apache Licence
@software: PyCharm
@file: easyquotation-multiprocessor.py
@time: 2017\10\17 0017 13:52
"""
from multiprocessing import Pool, Process
import os, time, random, easyquotation, pika, sys, traceback
import pprint
import redis
from flask import Flask, request, session, g, redirect, url_for, abort, \
render_template, flash, jsonify
# 定义新浪数据源
quotation = easyquotation.use("sina")
# 获得rabbitMQ连接
credentials = pika.PlainCredentials('admin', 'Admin123')
connection_parameters = pika.ConnectionParameters('192.168.2.202', 32672, 'zxm', credentials)
connection = pika.BlockingConnection(connection_parameters)
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
# 将原列表按跨度分割成多个新列表
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def processor(name, codes):
channel = connection.channel()
channel.exchange_declare(exchange='Clogs-' + name, exchange_type='fanout')
channel.queue_declare(name) # 如果有cc的队列,略过;如果没有,创建cc的队列
while True:
try:
data = quotation.stocks(codes)
for k, v in data.items():
# print(k)
k_dict = {'stockcode': k}
v['date'] = v['date'] + ' ' + v['time']
v['time'] = v['date']
v = {**k_dict, **v}
v = str(v)
v = v.replace('\'', '\"')
# if name == 'mq-all':
if (name == 'mq-all' and k == '000001'):
print('进程%s:%s' % (name, v))
channel.basic_publish(exchange='Clogs-' + name, routing_key='', body=v)
except:
traceback.print_exc()
# 单从展示来看理论上不需要查询得这么频繁
time.sleep(1.3)
# 创建进程池
def startPool():
# 获得配置文件中所有股票代码
stock_codes = quotation.load_stock_codes()
stock_codes_collections = list(chunks(stock_codes, 400))
pool = Pool(len(stock_codes_collections))
for i in range(0, len(stock_codes_collections)):
result = pool.apply_async(processor, ('mq-' + str(i + 1), stock_codes_collections[i]))
# 多启动一个进程,发布全部代码行情
p = Process(target=processor, args=('mq-all', stock_codes))
p.start()
pool.close()
pool.join()
connection.close()
if result.successful():
print('successful')
def syncToRedis():
redis_client = redis.Redis(host='192.168.2.152', port=6379, db=1, password='123456')
stock_codes = list(quotation.load_stock_codes())
# 删除旧缓存
redis_client.delete('stockCodes')
data = quotation.stocks(stock_codes)
# redis_client.set('stockCodes', list(data))
str = ''
for code in list(data):
str = str + code + ','
redis_client.set('stockCodes', "\"" + str + "\"")
if __name__ == '__main__':
syncToRedis()
startPool()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化