该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

ZBUS--轻量级消息队列、服务总线

##ZBUS 特性

  • 消息队列 -- 生产者消费者模式、发布订阅
  • 服务总线 -- 适配改造已有业务系统,使之具备跨平台与语言, RPC
  • RPC -- 分布式远程方法调用,Java方法透明代理
  • 跨平台、多语言
  • 轻量级、高可用

##ZBUS 项目结构

ZNET--超轻量级、高性能NIO网络通讯框架

ZBUS API

ZBUS 桥接

ZBUS 启动与监控

  1. zbus-dist选择zbus.sh或者zbus.bat直接执行
  2. 通过源码zbusServer.java个性化控制启动

简单监控

总线默认占用15555端口, http://localhost:15555 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口

高可用模式启动总线 分别启动zbusServer与TrackServer,无顺序之分,默认zbusServer占用15555端口,TrackServer占用16666端口。

ZBUS 设计概要图

zbus-arch

ZBUS 示例

Java Maven 依赖

<dependency>
	<groupId>org.zstacks</groupId>
	<artifactId>zbus</artifactId>
	<version>6.0.0</version>
</dependency>

生产者

	//1)创建Broker代理【重量级对象,需要释放】
	SingleBrokerConfig config = new SingleBrokerConfig();
	config.setBrokerAddress("127.0.0.1:15555");
	final Broker broker = new SingleBroker(config);
	
	//2) 创建生产者 【轻量级对象,不需要释放,随便使用】
	Producer producer = new Producer(broker, "MyMQ");
	producer.createMQ(); //如果已经确定存在,不需要创建
	
	Message msg = new Message(); 
	msg.setBody("hello world");  
	Message res = producer.sendSync(msg, 1000);
	System.out.println(res);
	
	//3)销毁Broker
	broker.close();

消费者

	//1)创建Broker代表
	SingleBrokerConfig brokerConfig = new SingleBrokerConfig();
	brokerConfig.setBrokerAddress("127.0.0.1:15555");
	Broker broker = new SingleBroker(brokerConfig);
	
	MqConfig config = new MqConfig(); 
	config.setBroker(broker);
	config.setMq("MyMQ");
	
	//2) 创建消费者
	@SuppressWarnings("resource")
	Consumer c = new Consumer(config);
	
	c.onMessage(new MessageCallback() {
		public void onMessage(Message msg, Session sess) throws IOException {
			System.out.println(msg);
		}
	});

RPC动态代理【各类复杂类型】

参考源码test目下的rpc部分

	SingleBrokerConfig config = new SingleBrokerConfig();
	config.setBrokerAddress("127.0.0.1:15555");
	Broker broker = new SingleBroker(config);
	
	RpcProxy proxy = new RpcProxy(broker); 
	Interface hello = proxy.getService(Interface.class, "mq=MyRpc");

	Object[] res = hello.objectArray();
	for (Object obj : res) {
		System.out.println(obj);
	}

	Object[] array = new Object[] { getUser("rushmore"), "hong", true, 1,
			String.class };
	
	
	int saved = hello.saveObjectArray(array);
	System.out.println(saved);
	 
	Class<?> ret = hello.classTest(String.class);
	System.out.println(ret);
	
	
	broker.close();

Spring集成--服务端(RPC示例)

无任何代码侵入使得你已有的业务接口接入到zbus,获得跨平台和多语言支持

	<!-- 暴露的的接口实现示例 -->
	<bean id="interface" class="org.zstacks.zbus.rpc.biz.InterfaceImpl"></bean>
	
	<bean id="serviceHandler" class="org.zstacks.zbus.client.rpc.RpcServiceHandler">
		<constructor-arg>
			<list>
				<!-- 放入你需要暴露的接口 ,其他配置基本不变-->
				<ref bean="interface"/>
			</list>
		</constructor-arg>
	</bean>
	
	<!-- 切换至高可用模式,只需要把broker的实现改为HaBroker配置 -->
	<bean id="broker" class="org.zstacks.zbus.client.broker.SingleBroker">
		<constructor-arg>
			<bean class="org.zstacks.zbus.client.broker.SingleBrokerConfig">
				<property name="brokerAddress" value="127.0.0.1:15555" />
			</bean>
		</constructor-arg>
	</bean>
	
	<!-- 默认调用了start方法,由Spring容器直接带起来注册到zbus总线上 -->
	<bean id="zbusService" class="org.zstacks.zbus.client.service.Service" init-method="start">
		<constructor-arg>  
			<bean class="org.zstacks.zbus.client.service.ServiceConfig">
				<property name="broker" ref="broker"/>
				<property name="mq" value="MyRpc"/>
				<property name="threadCount" value="2"/>
				<property name="serviceHandler" ref="serviceHandler"/>
			</bean>
		</constructor-arg>
	</bean>

Spring集成--客户端

	<!-- 切换至高可用模式,只需要把broker的实现改为HaBroker配置 -->
	<bean id="broker" class="org.zstacks.zbus.client.broker.SingleBroker">
		<constructor-arg>
			<bean class="org.zstacks.zbus.client.broker.SingleBrokerConfig">
				<property name="brokerAddress" value="127.0.0.1:15555" />
				<!-- 这里可以增加连接池参数配置,不配置使用默认值(参考commons-pool2) -->
			</bean>
		</constructor-arg>
	</bean>
	
	<bean id="rpcProxy" class="org.zstacks.zbus.client.rpc.RpcProxy">
		<constructor-arg> <ref bean="broker"/> </constructor-arg>
	</bean>

	<!-- 动态代理由RpcProxy的getService生成,需要知道对应的MQ配置信息(第二个参数) -->
	<bean id="interface" factory-bean="rpcProxy" factory-method="getService">
		<constructor-arg type="java.lang.Class" value="org.zstacks.zbus.rpc.biz.Interface"/> 
		<constructor-arg>
			<bean class="org.zstacks.zbus.client.rpc.RpcConfig"> 
				<property name="mq" value="MyRpc"/>
			</bean>
		</constructor-arg>
	</bean>

Spring完成zbus代理透明化,zbus设施从你的应用逻辑中彻底消失

public static void main(String[] args) { 
	ApplicationContext context = new ClassPathXmlApplicationContext("zbusSpringClient.xml");
	
	Interface intf = (Interface) context.getBean("interface");
	
	System.out.println(intf.listMap());
}

ZBUS消息协议

空文件

简介

轻量级服务总线/消息队列,1)多种消息模式--支持生产者/消费者,发布订阅,RPC。2)丰富的API--C/C++/C#/JAVA/Python/Node.JS跨平台、多语言支持; 3)开放协议标准--原生兼容HTTP协议(长连接),头部动态扩展;4)支持TrackServer与ZbusServer高可用横向动态扩容机制。 展开 收起
取消

发行版

暂无发行版

贡献者

全部

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化