diff --git a/src/main/java/org/bdware/doip/cluster/callback/DDOClusterDoipMessageCallback.java b/src/main/java/org/bdware/doip/cluster/callback/DDOClusterDoipMessageCallback.java index 5d2346444f6a89cc795c9ffa4cd6e19aa9d9d1c2..ce3bc5878460aabb731b6011097818350f8609dd 100644 --- a/src/main/java/org/bdware/doip/cluster/callback/DDOClusterDoipMessageCallback.java +++ b/src/main/java/org/bdware/doip/cluster/callback/DDOClusterDoipMessageCallback.java @@ -16,10 +16,10 @@ import wrp.jdk.nashorn.api.scripting.NashornScriptEngineUtil; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; public class DDOClusterDoipMessageCallback implements DoipMessageCallback { @@ -29,10 +29,9 @@ public class DDOClusterDoipMessageCallback implements DoipMessageCallback { int joinCount; AtomicInteger count = new AtomicInteger(0); DoipMessageCallback originalCallback; - Map successResMap = new HashMap<>(); - Set problemResult = new HashSet<>(); + Map successResultMap = new ConcurrentHashMap<>(); + Set problemResultSet = new CopyOnWriteArraySet<>(); // 记录所有收到结果列表 - Set nodeIDs = new HashSet<>(); NashornScriptEngineUtil engineUtil; JoinInfo joinInfo; String[] targetDOIDs; @@ -58,14 +57,15 @@ public class DDOClusterDoipMessageCallback implements DoipMessageCallback { // logDoipMsgInfo(msg); // 如果nodeID不存在,或者该节点的返回值已经被处理过了,不需要再次调用onResult了 String nodeID = getNodeID(msg); + int requestID = msg.requestID; + // problem result if nodeID == null if (nodeID == null) { // 如果当前nodeID == null, 记录到problemResult中 - problemResult.add(msg); - } else if (nodeIDs.contains(nodeID)) { + problemResultSet.add(msg); + } else if (successResultMap.containsKey(requestID)) { LOGGER.info("Receive Duplicated Msg"); } else { - nodeIDs.add(nodeID); - successResMap.put(nodeID, msg); + successResultMap.put(requestID, msg); } int curCount = count.incrementAndGet(); // 小于joinCount,意味着还没达到聚合的标准,需要继续处理 @@ -86,30 +86,29 @@ public class DDOClusterDoipMessageCallback implements DoipMessageCallback { } public void executeJoinFunc(DoipMessage request, DoipMessageCallback originalCallback) { - DoipMessage resultMessage; - if (joinInfo != null && problemResult.size() == 0) { + if (joinInfo != null && problemResultSet.size() == 0) { handleJoinInfo(joinInfo, originalCallback); return; } - if (successResMap.size() + problemResult.size() == 1) { - if (successResMap.size() == 1) { - originalCallback.onResult(successResMap.values().iterator().next()); + if (successResultMap.size() + problemResultSet.size() == 1) { + if (successResultMap.size() == 1) { + originalCallback.onResult(successResultMap.values().iterator().next()); } else { - originalCallback.onResult(problemResult.iterator().next()); + originalCallback.onResult(problemResultSet.iterator().next()); } return; } DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); - if (problemResult.size() > 0) + if (problemResultSet.size() > 0) builder.createResponse(DoipResponseCode.UnKnownError, request); else builder.createResponse(DoipResponseCode.Success, request); JsonObject successResponses = new JsonObject(); - for (String key : successResMap.keySet()) { - successResponses.add(key, JsonUtil.parseObject(JsonDoipMessage.fromDoipMessage(successResMap.get(key)))); + for (Integer key : successResultMap.keySet()) { + successResponses.add(String.valueOf(key), JsonUtil.parseObject(JsonDoipMessage.fromDoipMessage(successResultMap.get(key)))); } JsonArray problemResponses = new JsonArray(); - for (DoipMessage msg : problemResult) { + for (DoipMessage msg : problemResultSet) { problemResponses.add(JsonUtil.parseObject(JsonDoipMessage.fromDoipMessage(msg))); } builder.addAttributes("successResponses", successResponses); @@ -122,12 +121,7 @@ public class DDOClusterDoipMessageCallback implements DoipMessageCallback { try { if (joinInfo.useDefault == null) { if (joinInfo.joinFuncName != null) { - engineUtil.invokeFunctionWithObjectAsync(joinInfo.joinFuncName, new RouteResultCallback() { - @Override - public void onResult(DoipMessage msg) { - originalCallback.onResult(msg); - } - }, successResMap, problemResult); + engineUtil.invokeFunctionWithObjectAsync(joinInfo.joinFuncName, originalCallback::onResult, successResultMap, problemResultSet); return; } else throw new IllegalArgumentException("missing joinFuncName"); } @@ -135,7 +129,7 @@ public class DDOClusterDoipMessageCallback implements DoipMessageCallback { switch (joinInfo.useDefault) { case add: double val = 0; - for (DoipMessage msg : successResMap.values()) { + for (DoipMessage msg : successResultMap.values()) { val += Double.parseDouble(msg.body.getDataAsJsonString()); returnedMsg = msg; } @@ -145,7 +139,7 @@ public class DDOClusterDoipMessageCallback implements DoipMessageCallback { return; case multiply: val = 1; - for (DoipMessage msg : successResMap.values()) { + for (DoipMessage msg : successResultMap.values()) { val *= Double.parseDouble(msg.body.getDataAsJsonString()); returnedMsg = msg; } @@ -162,7 +156,6 @@ public class DDOClusterDoipMessageCallback implements DoipMessageCallback { builder.setBody(bo.toByteArray()); e.printStackTrace(); originalCallback.onResult(builder.create()); - return; } } }