同步操作将从 yanhuaitang/iot 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
使用java语言且基于netty, spring boot, redis等开源项目开发来的物联网网络中间件, 支持udp, tcp通讯等底层协议和http, mqtt, websocket(默认实现和自定义协议头实现), modbus(tcp,rtu),plc,dtu(支持心跳,设备注册功能以及AT协议和自定义协议支持),dtu for modbus tcp,dtu for modbus rtu组件适配 等上层协议. 主打工业物联网底层网络交互、设备管理、数据存储、大数据处理. (其中plc包括西门子S7系列,欧姆龙Fins,罗克韦尔CIP,三菱MC). 数据存储将使用taos数据库以及redis消息队列
下面将由一个例子来展开说明iot框架的使用 例子:比如服务端接受到客户端报文如下:0x01 11 12 13 05 06 EE FF 八个字节, 如果客户端发送完此报文之后没有连续发送, 服务端接受到的数据就是一包完整的报文, 我们可以很容易的读取缓存区的内容然后进行处理; 那如果第一包发送完之后服务端还在忙其他的时没有及时读取缓冲区内容这时候又接收到了客户端的第二个报文,这时候数据缓冲区的数据如下:0x01 11 12 13 05 06 EE FF 02 21 22 23 25 26 AA BB CC DD,这时候程序读取的缓冲区是两包完整的报文,这时候程序怎么将两包报文拆开处理呢?这时候就需要编解码器上场了! 如何处理上面报文粘包和拆包的情况呢?netty提供了一下几种常用的解码器
在设备对接的时候厂家一般会提供协议文档,然后就需要我们来选择合适的解码器,当我们确认好了解码器之后就可以开始编码了,下面先开始服务端的对接教程
编写服务端网络程序时需要监听某个端口来给客户端连接, 当我们选择某个解码器之后就可以选择对应的服务端解码器组件来开启某个端口,iot框架适配了netty提供的几个常用的解码器
以下是使用LengthFieldBasedFrameDecoderServerComponent示例
// 首先:必须先创建一个组件对象来继承LengthFieldBasedFrameDecoderServerComponent
// 以iot-test模块的断路器服务端模拟为例
public class BreakerServerComponent extends LengthFieldBasedFrameDecoderServerComponent<BreakerServerMessage> {
public BreakerServerComponent(ConnectProperties connectProperties) {
super(connectProperties, ByteOrder.LITTLE_ENDIAN, 256, 0
, 4, 0, 0, true);
}
xxx 实现省略
}
// 注:要求传入ConnectProperties对象作为构造参数, 此对象可以指定ip和端口
我们看到上面的组件需要一个泛型参数BreakerServerMessage, 此参数就是报文对象,上面我们说过报文对象是一个二进制数据载体,用于在iot框架各个对象中进行使用,下面我们来看看报文对象除了作为数据载体还有哪些扩展功能
// 创建服务端报文对象必须继承ServerMessage类
public class BreakerServerMessage extends ServerMessage {
public BreakerServerMessage(byte[] message) {
super(message);
}
//省略其他构造函数
@Override
protected MessageHead doBuild(byte[] message) {
this.messageBody = MessageCreator.buildBreakerBody(message);
return MessageCreator.buildBreakerHeader(message);
}
}
当我们创建了组件和报文类之后就可以启动应用了, 这时候日志里面会打印出组件配置的端口已经开启监听了。到了这里已经开了一个好头了算是成功了一半了,接下来就是创建协议对象了,从厂家那里拿到的协议文档至少包含一个协议, 一般我们建议为文档里面的每个协议创建一个对应的协议对象(Protocol)
协议对象就是用来将接收到的二进制数据解析成和协议文档里对应的字段的;出于框架架构的需要我们将协议分成两种类型 如下:
首先我们先看一下ClientInitiativeProtocol方法声明, 还是以断路器为例
// 用来接收断路器主动上报的电压电流等数据
public class DataAcceptProtocol extends ClientInitiativeProtocol<BreakerServerMessage> {
private double v; // 电压
private double i; // 电流
private double power1; // 有功功率
private double power2; // 无功功率
private double py; // 功率因素
public DataAcceptProtocol(BreakerServerMessage requestMessage) {
super(requestMessage);
}
@Override
protected void doBuildRequestMessage(BreakerServerMessage requestMessage) {
byte[] message = requestMessage.getBody().getMessage();
this.v = ByteUtil.bytesToInt(message, 0) / 100.0;
this.i = ByteUtil.bytesToInt(message, 4) / 100.0;
this.power1 = ByteUtil.bytesToInt(message, 8) / 100.0;
this.power2 = ByteUtil.bytesToInt(message, 12) / 100.0;
this.py = ByteUtil.bytesToShort(message, 16) / 100.0;
}
// 响应断路器的请求
@Override
protected BreakerServerMessage doBuildResponseMessage() {
Message.MessageHead head = requestMessage().getHead();
return new BreakerServerMessage(MessageCreator.buildBreakerHeader(head
.getEquipCode(), head.getMessageId(), 4, head.getType()),
MessageCreator.buildBreakerBody(StatusCode.Success));
}
// 省略其他
}
平台已经可以接收设备主动上报的数据了,那平台要怎么主动给设备发送数据呢?ServerInitiativeProtocol协议就是用来声明一个协议是平台主动发给客户端的,下面以平台下发给断路器切换开关为例
/**
* 切换断路器的开闭状态
*/
public class SwitchStatusProtocol extends ServerInitiativeProtocol<BreakerServerMessage> {
private String deviceSn;
public SwitchStatusProtocol(String deviceSn) {
this.deviceSn = deviceSn;
}
/**
* 构建要发送给断路器的报文
*/
@Override
protected BreakerServerMessage doBuildRequestMessage() throws IOException {
DefaultMessageHead messageHead = MessageCreator.buildBreakerHeader(Long.valueOf(this.deviceSn), 0, protocolType());
return new BreakerServerMessage(messageHead);
}
/**
* 处理断路器对此处请求的响应
*/
@Override
protected void doBuildResponseMessage(BreakerServerMessage message) {
/*设备响应是否切换成功的处理*/
}
@Override
public BreakerProtocolType protocolType() {
return BreakerProtocolType.SwitchStatus;
}
}
// 然后在业务代码里面调用请求方法:new SwitchStatusProtocol(deviceSn).request(); 这样就可以向指定的设备发起请求了
看到现在我们已经可以接收到设备的请求和主动发起请求给设备了,但是有一个问题,一般一台设备会包含很多功能(协议),因为每个功能都会创建一个协议对象那么问题来了,当设备发给平台的时候平台怎么知道需要交给哪个协议处理呢? 一般来说如果有多个协议那么协议文档一般会有一个字段用来区分的, 比如报文:0x01 11 12 13 05 06 EE FF的第二个字节 0x11作为协议类型字段,比如01代表切换开关,02代表上报温湿度,03代表锁断路器;所以我们需要先解析出协议类型(如果有的话) 我们在看一下报文对象:需要在doBuild方法里面先解析出报文头(MessageHead),报文头里面需要包含设备编号和协议类型等信息
// 先看一下报文头的接口声明,其中type就是用来声明一个客户端的协议类型的
interface MessageHead {
/**
* 设备编号
* @return
*/
String getEquipCode();
/**
* 报文的唯一编号
* @return
*/
String getMessageId();
/**
* 获取交易类型
* @return
*/
<T> T getType();
byte[] getMessage();
default int getLength() {
return getMessage().length;
}
}
// 然后在报文对象里面解析出报文头
public class BreakerServerMessage extends ServerMessage {
//省略其他构造函数
@Override
protected MessageHead doBuild(byte[] message) {
this.messageBody = MessageCreator.buildBreakerBody(message);
return MessageCreator.buildBreakerHeader(message);
}
}
上面已经解析出了对应的协议类型了,下一步就需要创建协议对象来解析对应的报文了。上面说过组件对象用来管理整个框架中的各个接口, 首先组件对象实现了协议工厂接口 如下:
// 协议工厂接口
public interface IotProtocolFactory<T extends SocketMessage> extends StorageManager<String, Protocol>
// 组件对象实现了协议工厂接口 如下
public abstract class TcpDecoderServerComponent<M extends ServerMessage> extends TcpServerComponent<M> implements IotSocketServer, IotProtocolFactory<M>
所以我们可以在组件里面来管理各个协议对象的创建,以断路器组件为例:
public class BreakerServerComponent extends LengthFieldBasedFrameDecoderServerComponent<BreakerServerMessage> {
@Override
public AbstractProtocol getProtocol(BreakerServerMessage message) {
// 因为我们已经解析出了协议类型了,所以可以通过协议类型创建对应的协议
BreakerProtocolType type = message.getHead().getType();
// 断路器主动推送电压和电流值
if(type == BreakerProtocolType.PushData) {
return new DataAcceptProtocol(message);
} else { // 移除平台主动请求的协议
return remove(message.getHead().getMessageId());
}
}
// 省略其他方法
}
现在我们已经通过协议解析出了客户端给我们的数据了,接下去就是对这些数据进行处理, 比如数据入库,实时批处理,缓存等等操作,iot框架提供了一个专门的接口处理业务,先看协议处理器的声明 如下:
public interface ProtocolHandle<T extends Protocol> {
Method method = ReflectionUtils.findMethod(ProtocolHandle.class, "handle", Protocol.class);
/**
* 协议的业务处理
* @param protocol
*/
Object handle(T protocol);
}
只有一个方法handle,注意泛型参数,此参数用来声明此处理器是用于处理哪个协议的,所以理论上一个协议对象有一个与之对应的处理器(不需要业务处理的协议就不需要创建),那问题来了,程序是怎么根据协议对象获取对应的处理器的呢?这时候需要将协议处理器对象注入到spring容器,将此对象交由spring容器管理,所以我们需要一个处理器工厂用来可以方便的通过协议对象获取对应的协议处理器对象 声明如下:
public abstract class BusinessFactory<T extends ProtocolHandle> implements InitializingBean, BeanFactoryAware {
private ListableBeanFactory beanFactory;
private HashMap<Class<? extends Protocol>, T> mapper;
// 通过协议类型对象获取对应的协议处理器对象
public T getProtocolHandle(Class<? extends Protocol> protocolClazz){
return mapper.get(protocolClazz);
}
// 省略其他接口
}
下面我们看一下断路器的协议处理器对象:
// 注入到spring容器
@Component
public class DataAcceptHandle implements ServerProtocolHandle<DataAcceptProtocol> {
// 比如做数据入库、发送到mqtt网关、缓存到redis、提交到
@Override
public Object handle(DataAcceptProtocol protocol) {
final int i = RandomUtil.randomInt(1, 9);
if(i % 2 == 0) { // 测试自动创建数据表
TaosBreakerUsingStable entity = new TaosBreakerUsingStable(protocol.getEquipCode());
entity.setI(protocol.getI());
entity.setV(protocol.getV());
entity.setPy(protocol.getPy());
entity.setSn(protocol.getEquipCode());
entity.setPower1(protocol.getPower1());
entity.setPower2(protocol.getPower2());
return entity;
} else { // 测试插入数据表
TaosBreakerDataTable dataTable = new TaosBreakerDataTable();
dataTable.setI(protocol.getI());
dataTable.setV(protocol.getV());
dataTable.setPy(protocol.getPy());
dataTable.setTs(new Date());
dataTable.setSn(protocol.getEquipCode());
dataTable.setPower1(protocol.getPower1());
dataTable.setPower2(protocol.getPower2());
return dataTable;
}
}
}
如果我们需要确认客户端的请求, 比如断路器要同步平台的时间, 这时候平台接收到请求之后需要将时间按照协议格式发给客户端这时候要怎么将数据给客户端?还是以断路器为例:
// 比如当我们收到断路器上报的电压电流值之后需要跟断路器说一声我已经收到你发送的值了,可以在 #doBuildResponseMessage()方法里面根据协议文档的要求构建要发送给断路器的报文, 平台会主动响应给断路器
public class DataAcceptProtocol extends ClientInitiativeProtocol<BreakerServerMessage> {
// 确认断路器请求
@Override
protected BreakerServerMessage doBuildResponseMessage() {
Message.MessageHead head = requestMessage().getHead();
return new BreakerServerMessage(MessageCreator.buildBreakerHeader(head
.getEquipCode(), head.getMessageId(), 4, head.getType()),
MessageCreator.buildBreakerBody(StatusCode.Success));
}
// 省略其他
}
到了这里这个iot框架已经实现了闭环了,接收客户端数据 -> 数据解析 -> 业务处理 -> 响应给客户端 或者 平台请求客户端 -> 客户端反馈处理结果 -> 平台接收结果 -> 业务处理。但是还有很多的细节需要处理,比如协议的同步和异步处理,设备的注册等等。接着往下看!
异步请求指的是不阻塞调用请求的线程(不等待设备响应只管发送),这里面会碰到的问题:
我们知道tcp和http不一样并不是基于同步请求响应模式,也就是说平台给设备发送请求完之后请求线程不会阻塞等待设备的响应,当然也不建议阻塞等待因为太耗费性能,这也是物联网一般不会使用http的原因;因为物联网设备太多如果都使用同步会太耗费服务器资源,但是iot框架提供了一个同步机制,原理是调用请求方法时在写出报文之后加锁阻塞此请求然后等设备响应或者响应超时后释放锁。虽然同步会阻塞请求线程,但是在一些需要实时知道请求结果得场景下(比如前端发起请求之后需要知道执行状态)可以简化是流程
不管是同步请求还是异步请求,这里都会有个问题就是设备在应答的时候怎么知道此应答对应哪一条请求指令(协议),解决的方法也很简单给每条下发指令的报文做一个标记(MessageId),设备在应答的时候把请求报文的MessageId原封不动的发上来,然后只需要通过此messageId获取对应的请求协议对象(保存协议对象时用map存储, messageId作为key, Protocol作为value),那我们在什么时候解析此messageId呢? 如下:
// 1. 在报文里面解析报文头得时候一起解析出messageId
public class BreakerServerMessage extends ServerMessage {
//省略其他构造函数
@Override
protected MessageHead doBuild(byte[] message) {
this.messageBody = MessageCreator.buildBreakerBody(message);
return MessageCreator.buildBreakerHeader(message);
}
}
// 2. 在对应得组件里面移除掉对应的协议并且返回
public class BreakerServerComponent extends LengthFieldBasedFrameDecoderServerComponent<BreakerServerMessage> {
@Override
public AbstractProtocol getProtocol(BreakerServerMessage message) {
// 因为我们已经解析出了协议类型了,所以可以通过协议类型创建对应的协议
BreakerProtocolType type = message.getHead().getType();
// 断路器主动推送电压和电流值
if(type == BreakerProtocolType.PushData) {
return new DataAcceptProtocol(message);
} else { // 移除平台主动请求的协议
return remove(message.getHead().getMessageId());
}
}
// 省略其他方法
}
interface MessageHead {
/**
* 报文的唯一编号
* @return
*/
String getMessageId();
}
dtu设备连网时发送的第一包将作为可以标识此dtu的唯一编号(DTU设备编号),基本上所有的dtu都是可以配置注册包和心跳包(默认使用字符串格式)
iot默认的心跳包和dtu编号(DTU设备编号)是一致的, 也就是说dtu配置的心跳包的内容和dtu编号一致的话iot将会作为心跳包(默认使用字符串格式)
对于使用modbus协议且通过RS282 RS485连接dtu的设备可以使用已经写好的服务端组件直接开发,以下是使用教程
<dependency>
<groupId>com.iteaj</groupId>
<version>${iot.version}</version>
<artifactId>iot-server</artifactId>
</dependency>
<dependency>
<groupId>com.iteaj</groupId>
<version>${iot.version}</version>
<artifactId>iot-modbus</artifactId>
</dependency>
// 如果设备使用modbus rtu协议则使用此组件
@Bean
public ModbusRtuForDtuServerComponent modbusRtuForDtuServerComponent() {
// 监听7058端口
return new ModbusRtuForDtuServerComponent(new ConnectProperties(7058));
}
// 如果设备使用modbus tcp协议则使用此组件
@Bean
public ModbusTcpForDtuServerComponent modbusTcpForDtuServerComponent() {
// 监听7068端口
return new ModbusTcpForDtuServerComponent(new ConnectProperties(7068));
}
// modbus rtu协议操作设备教程
ModbusRtuForDtuCommonProtocol read03Protocol = ModbusRtuForDtuCommonProtocol.buildRead03(...);
read03Protocol.request(protocol -> {
// 首先判断协议的执行状态
if(read03Protocol.getExecStatus() == ExecStatus.success) {
ModbusRtuForDtuMessage responseMessage = read03Protocol.responseMessage();
ModbusRtuBody body = responseMessage.getBody();
if(body.isSuccess()) { // 设备执行指令成功
Payload payload = read03Protocol.getPayload();
int value = payload.readInt(0);
// ...
} else { // 设备执行指令失败
ModbusErrCode code = body.getErrCode();
// ...
}
}
return null;
});
// modbus tcp协议操作设备教程
ModbusTcpForDtuCommonProtocol read03Protocol = ModbusTcpForDtuCommonProtocol.buildRead03(...);
read03Protocol.request(protocol -> {
// 首先判断协议的执行状态
if(read03Protocol.getExecStatus() == ExecStatus.success) {
ModbusTcpForDtuMessage responseMessage = read03Protocol.responseMessage();
ModbusTcpBody body = responseMessage.getBody();
if(body.isSuccess()) { // 设备执行指令成功
Payload payload = read03Protocol.getPayload();
int value = payload.readInt(0);
// ...
} else { // 设备执行指令失败
ModbusErrCode code = body.getErrCode();
// ...
}
}
return null;
});
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。