diff --git a/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java b/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java index 4ddae571480af262502d1ace3258768539ce8520..cd386789aab6e0a0ff2f673118877f5a339db5a2 100644 --- a/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java +++ b/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java @@ -20,8 +20,6 @@ import org.bdware.doip.endpoint.client.DoipMessageCallback; import org.bdware.irp.exception.IrpClientException; import org.zz.gmhelper.SM2KeyPair; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.*; @@ -36,7 +34,7 @@ public class DoaClusterClient extends DoipClientImpl { private static final Logger LOGGER = LogManager.getLogger(DoaClusterClient.class); private final FlowControl flowControl; // todo optimize thread pool - public static ExecutorService sendMessagePool = + public static ExecutorService taskPool = new ThreadPoolExecutor( 8, // corePoolSize (number of threads to keep in the pool) 16, // maximumPoolSize (maximum number of threads allowed in the pool) @@ -61,7 +59,6 @@ public class DoaClusterClient extends DoipClientImpl { if (routerConfig.extraConfig != null && routerConfig.extraConfig.has("rateThresholdFlowControl")) threshold = routerConfig.extraConfig.get("rateThresholdFlowControl").getAsInt(); flowControl = new RateThresholdFlowControl(threshold); - // new Thread(this::consumeSendMessageTask).start(); LOGGER.info("The SmartClusterClient has been initialized"); } @@ -83,22 +80,13 @@ public class DoaClusterClient extends DoipClientImpl { } public DoipMessage sendMessageSync(DoipMessage doipMessage, long timeout, boolean needFlowControl) { - CountDownLatch countDownLatch = new CountDownLatch(1); - List result = new ArrayList<>(); - DoipMessageCallback callback = new DoipMessageCallback() { - @Override - public void onResult(DoipMessage msg) { - result.add(msg); - countDownLatch.countDown(); - } - }; + CompletableFuture result = new CompletableFuture<>(); + DoipMessageCallback callback = result::complete; sendMessage(doipMessage, callback, needFlowControl); + try { - countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - if (result.size() > 0) return result.get(0); - else { + return result.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); builder.createResponse(DoipResponseCode.MoreThanOneErrors, doipMessage); builder.setBody("timeout".getBytes()); @@ -116,15 +104,12 @@ public class DoaClusterClient extends DoipClientImpl { if (cachedDO == null) { JsonObject doInfo = DOResolutionUtil.getDOInfo(irsClient, doid); BDWType doType = DOResolutionUtil.getDOType(doInfo); - switch (doType) { - case DDO: - DOResolutionUtil.verifyDDOInfo(doInfo); - cachedDO = ddoInfo2DDOEntity(doid, doInfo); - break; - default: - DOResolutionUtil.verifyBDOInfo(doInfo); - cachedDO = bdoInfo2BDOEntity(doid, doInfo); - break; + if (doType == BDWType.DDO) { + DOResolutionUtil.verifyDDOInfo(doInfo); + cachedDO = ddoInfo2DDOEntity(doid, doInfo); + } else { + DOResolutionUtil.verifyBDOInfo(doInfo); + cachedDO = bdoInfo2BDOEntity(doid, doInfo); } } DoipMessageCallback flowControlCallback = cb; @@ -133,8 +118,8 @@ public class DoaClusterClient extends DoipClientImpl { cb.onResult(msg); flowControl.maintainFlowControl(); }; - RequestPack requestPack = new RequestPack(this, cachedDO, doipMessage, flowControlCallback, needFlowControl); - produceSendMessageTask(requestPack); + SendMsgReqPack sendMsgReqPack = new SendMsgReqPack(this, cachedDO, doipMessage, flowControlCallback, needFlowControl); + executeTask(sendMsgReqPack); } public DDOEntity ddoInfo2DDOEntity(String ddoID, JsonObject ddoInfo) { @@ -153,7 +138,7 @@ public class DoaClusterClient extends DoipClientImpl { return (BDOEntity) doCache.get(bdoID); } - private void produceSendMessageTask(RequestPack pack) { + private void executeTask(SendMsgReqPack pack) { if (pack.needFlowControl()) for (int i = 0; i < 10 && !flowControl.enableRequestPass(); i++) { try { @@ -163,7 +148,7 @@ public class DoaClusterClient extends DoipClientImpl { e.printStackTrace(); } } - sendMessagePool.execute(pack); + taskPool.execute(pack); } public void signMessage(DoipMessage message) { diff --git a/src/main/java/org/bdware/doip/cluster/engine/DigestUtil.java b/src/main/java/org/bdware/doip/cluster/engine/DigestUtil.java index f720156c3fa1a9a57ecd668d0447c1a801f2448c..1a7516b5b8e1e5e12b97faec38bf8f8302f6af81 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/DigestUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/DigestUtil.java @@ -49,7 +49,7 @@ public class DigestUtil { int index = 0; while (treeNode.level != 0) { double hash = md5Double(address + treeNode.label); - if (hash < (double) treeNode.leftChild.weight / treeNode.weight) { + if (hash < treeNode.leftChild.weight / treeNode.weight) { treeNode = treeNode.leftChild; } else { treeNode = treeNode.rightChild; @@ -79,6 +79,7 @@ public class DigestUtil { label.append('1'); for (int i = 0; i < level; ++i) label.append('0'); + assert leftTree != null && rightTree != null; TreeNode thisNode = new TreeNode(label.toString(), leftTree.weight + rightTree.weight, level, leftTree.leafCount + rightTree.leafCount); thisNode.leftChild = leftTree; diff --git a/src/main/java/org/bdware/doip/cluster/engine/DoaClientUtil.java b/src/main/java/org/bdware/doip/cluster/engine/DoaClientUtil.java index 78f097a389dc0fed1cb3a00298c57e6f2c3e644a..f01f5d4cdd0b8324eb5217a126b852c40555c044 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/DoaClientUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/DoaClientUtil.java @@ -24,8 +24,7 @@ public class DoaClientUtil { } public DoipMessage sendMessageSync(DoipMessage doipMessage) { - DoipMessage result = client.sendMessageSync(doipMessage, 10000, false); - return result; + return client.sendMessageSync(doipMessage, 10000, false); } public void sendMessage(DoipMessage arg, DoipMessageCallback callback) { client.sendMessage(arg, callback, false); diff --git a/src/main/java/org/bdware/doip/cluster/engine/DoipMessageUtil.java b/src/main/java/org/bdware/doip/cluster/engine/DoipMessageUtil.java index 8e34794e8243ed8005278006a0221197d6f3f880..2a293885736d647682e12f5b5268c240b8a87446 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/DoipMessageUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/DoipMessageUtil.java @@ -44,20 +44,17 @@ public class DoipMessageUtil { } public DoipMessageCallback wrapCallback(ScriptFunction som, Object... appendArgs) { - return new DoipMessageCallback() { - @Override - public void onResult(DoipMessage result) { - Object[] newArr; - if (appendArgs != null && appendArgs.length > 0) { - newArr = new Object[appendArgs.length + 1]; - newArr[0] = result; - System.arraycopy(appendArgs, 0, newArr, 1, appendArgs.length); - } else { - newArr = new Object[1]; - newArr[0] = result; - } - engine.invokeFunctionWithObject(som, Object.class, newArr); + return result -> { + Object[] newArr; + if (appendArgs != null && appendArgs.length > 0) { + newArr = new Object[appendArgs.length + 1]; + newArr[0] = result; + System.arraycopy(appendArgs, 0, newArr, 1, appendArgs.length); + } else { + newArr = new Object[1]; + newArr[0] = result; } + engine.invokeFunctionWithObject(som, Object.class, newArr); }; } public DoipResponseCode getResponseCode(String respCode){ diff --git a/src/main/java/org/bdware/doip/cluster/engine/IdentifierUtil.java b/src/main/java/org/bdware/doip/cluster/engine/IdentifierUtil.java index d253aaea9ffffc1702fe85aa65bb2ad513655464..1c68bbceafd5a537c45d81430070826c4fb953a9 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/IdentifierUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/IdentifierUtil.java @@ -1,6 +1,5 @@ package org.bdware.doip.cluster.engine; -import com.google.gson.JsonArray; import org.bdware.doip.cluster.entity.DDOEntity; import java.util.List; diff --git a/src/main/java/org/bdware/doip/cluster/engine/JavaUtil.java b/src/main/java/org/bdware/doip/cluster/engine/JavaUtil.java index 99c2cfb7acfa540ce11660e7cecdb4344f4f1385..94956a7ccf67f17103402b6dc0378cf92e4bad33 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/JavaUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/JavaUtil.java @@ -1,7 +1,5 @@ package org.bdware.doip.cluster.engine; -import com.google.gson.JsonElement; -import com.google.gson.JsonPrimitive; import jdk.nashorn.api.scripting.ScriptObjectMirror; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/src/main/java/org/bdware/doip/cluster/entity/BDOEntity.java b/src/main/java/org/bdware/doip/cluster/entity/BDOEntity.java index b31b37921aa45a5188e60c5710d3fbeb753d8218..678273c3f8356ecde80322a274f2c0e3a31df1bb 100644 --- a/src/main/java/org/bdware/doip/cluster/entity/BDOEntity.java +++ b/src/main/java/org/bdware/doip/cluster/entity/BDOEntity.java @@ -3,19 +3,16 @@ package org.bdware.doip.cluster.entity; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.doip.audit.EndpointConfig; -import org.bdware.doip.audit.client.AuditDoipClient; import org.bdware.doip.cluster.callback.BDODelegateDoipMessageCallback; import org.bdware.doip.cluster.client.DoaClusterClient; -import org.bdware.doip.cluster.flowcontrol.core.RateThresholdFlowControl; import org.bdware.doip.cluster.util.AuditDoipClientCacheUtil; -import org.bdware.doip.cluster.util.ClientReadyCallback; import org.bdware.doip.codec.doipMessage.DoipMessage; import org.bdware.doip.endpoint.client.DoipMessageCallback; public class BDOEntity implements DOEntity { static Logger LOGGER = LogManager.getLogger(BDOEntity.class); - private String address; - private String version; + private final String address; + private final String version; public BDOEntity(String address, String version) { this.address = address; @@ -37,17 +34,12 @@ public class BDOEntity implements DOEntity { } - RateThresholdFlowControl flowControl = new RateThresholdFlowControl(200); - @Override public void sendMessage(DoaClusterClient client, DoipMessage doipMessage, DoipMessageCallback doipMessageCallback, EndpointConfig routerConfig) { - AuditDoipClientCacheUtil.getClientByUrl(address, version, new ClientReadyCallback() { - @Override - public void onReady(AuditDoipClient doipClient) { - client.signMessage(doipMessage); - BDODelegateDoipMessageCallback BDODelegateDoipMessageCallback = new BDODelegateDoipMessageCallback(doipClient, doipMessageCallback, doipMessage); - doipClient.sendMessage(doipMessage, BDODelegateDoipMessageCallback); - } + AuditDoipClientCacheUtil.getClientByUrl(address, version, doipClient -> { + client.signMessage(doipMessage); + BDODelegateDoipMessageCallback BDODelegateDoipMessageCallback = new BDODelegateDoipMessageCallback(doipClient, doipMessageCallback, doipMessage); + doipClient.sendMessage(doipMessage, BDODelegateDoipMessageCallback); }); } diff --git a/src/main/java/org/bdware/doip/cluster/entity/DDOEntity.java b/src/main/java/org/bdware/doip/cluster/entity/DDOEntity.java index 2cfc16c7920a9cc2519be3b787fbe3c67acc066d..33a2f94aa944e95e86d5227c3651e18071a9adff 100644 --- a/src/main/java/org/bdware/doip/cluster/entity/DDOEntity.java +++ b/src/main/java/org/bdware/doip/cluster/entity/DDOEntity.java @@ -24,18 +24,16 @@ import java.util.Map; public class DDOEntity implements DOEntity { // hello -> routeInfo - private Map ddoRouteInfo; + private final Map ddoRouteInfo; // hello -> joinInfo - private Map ddoJoinInfo; - private String[] bdoList; - private JsonObject bcoAccessRules; - private NashornScriptEngineUtil engineUtil; - private JsonObject appendixes; - private JsonObject ddoInfo; - private DoaClusterClient client; + private final Map ddoJoinInfo; + private final String[] bdoList; + private final JsonObject bcoAccessRules; + private final NashornScriptEngineUtil engineUtil; + private final JsonObject appendixes; + private final DoaClusterClient client; public DDOEntity(JsonObject ddoInfo, DoaClusterClient client) { - this.ddoInfo = ddoInfo; String bcoId = ddoInfo.get("bcoId").getAsString(); appendixes = ddoInfo.get("appendixes").getAsJsonObject(); bcoAccessRules = appendixes.get(bcoId).getAsJsonObject().get("accessRules").getAsJsonObject(); diff --git a/src/main/java/org/bdware/doip/cluster/entity/DOEntity.java b/src/main/java/org/bdware/doip/cluster/entity/DOEntity.java index b5ef7e90c9f67fd30e3461ee2b19e19e33484a19..13bf79a3bbbc4c78ff2c820bbfaaa2754283a123 100644 --- a/src/main/java/org/bdware/doip/cluster/entity/DOEntity.java +++ b/src/main/java/org/bdware/doip/cluster/entity/DOEntity.java @@ -6,7 +6,7 @@ import org.bdware.doip.codec.doipMessage.DoipMessage; import org.bdware.doip.endpoint.client.DoipMessageCallback; public interface DOEntity { - public BDWType getType(); + BDWType getType(); void sendMessage(DoaClusterClient client, DoipMessage doipMessage, DoipMessageCallback doipMessageCallback, EndpointConfig routerConfig); } diff --git a/src/main/java/org/bdware/doip/cluster/entity/RequestPack.java b/src/main/java/org/bdware/doip/cluster/entity/SendMsgReqPack.java similarity index 83% rename from src/main/java/org/bdware/doip/cluster/entity/RequestPack.java rename to src/main/java/org/bdware/doip/cluster/entity/SendMsgReqPack.java index e320192d15387fd62d835c7e72e05ecde928e276..391a9e595674a6ee4884a0d2863141b1b79633a4 100644 --- a/src/main/java/org/bdware/doip/cluster/entity/RequestPack.java +++ b/src/main/java/org/bdware/doip/cluster/entity/SendMsgReqPack.java @@ -6,8 +6,8 @@ import org.bdware.doip.cluster.client.DoaClusterClient; import org.bdware.doip.codec.doipMessage.DoipMessage; import org.bdware.doip.endpoint.client.DoipMessageCallback; -public class RequestPack extends RequestPackIntf { - static Logger LOGGER = LogManager.getLogger(RequestPack.class); +public class SendMsgReqPack extends RequestPackIntf { + static Logger LOGGER = LogManager.getLogger(SendMsgReqPack.class); private final long createTime; DOEntity doEntity; DoipMessage doipMessage; @@ -15,7 +15,7 @@ public class RequestPack extends RequestPackIntf { boolean needFlowControl; DoaClusterClient doaClusterClient; - public RequestPack(DoaClusterClient doaClusterClient, DOEntity cachedDO, DoipMessage message, DoipMessageCallback callback, boolean needFlowControl) { + public SendMsgReqPack(DoaClusterClient doaClusterClient, DOEntity cachedDO, DoipMessage message, DoipMessageCallback callback, boolean needFlowControl) { this.doaClusterClient = doaClusterClient; this.doEntity = cachedDO; this.doipMessage = message; diff --git a/src/main/java/org/bdware/doip/cluster/flowcontrol/FlowControl.java b/src/main/java/org/bdware/doip/cluster/flowcontrol/FlowControl.java index d2e3415d280395f1a19a3a95b6f495d407b6b838..fef7937f81d82757a40da8d975c9e4917d5d7e3b 100644 --- a/src/main/java/org/bdware/doip/cluster/flowcontrol/FlowControl.java +++ b/src/main/java/org/bdware/doip/cluster/flowcontrol/FlowControl.java @@ -1,7 +1,7 @@ package org.bdware.doip.cluster.flowcontrol; public interface FlowControl { - public boolean enableRequestPass(); + boolean enableRequestPass(); - public void maintainFlowControl(); + void maintainFlowControl(); } diff --git a/src/main/java/org/bdware/doip/cluster/util/AuditDoipClientCacheUtil.java b/src/main/java/org/bdware/doip/cluster/util/AuditDoipClientCacheUtil.java index 1edb9d72ea3816ba920e0f68ebd4802a0a9d442c..5fe33c588e9e35ecdc0f083a053e5183b54e935e 100644 --- a/src/main/java/org/bdware/doip/cluster/util/AuditDoipClientCacheUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/AuditDoipClientCacheUtil.java @@ -41,7 +41,7 @@ public class AuditDoipClientCacheUtil { public static void getClientByUrl(String address, String version, ClientReadyCallback callback) { ReconnectReqPack pack = new ReconnectReqPack(address, version, callback); - DoaClusterClient.sendMessagePool.execute(pack); + DoaClusterClient.taskPool.execute(pack); } public static AuditDoipClient createWithoutConnect(String address) { diff --git a/src/main/java/org/bdware/doip/cluster/util/AutoCancelReconnectCallback.java b/src/main/java/org/bdware/doip/cluster/util/AutoCancelReconnectCallback.java index a8e31918926f57850c92e837036ea7f1163edb7c..29442f4dc8b66b995af7c23cb1ff4f93b1044ccb 100644 --- a/src/main/java/org/bdware/doip/cluster/util/AutoCancelReconnectCallback.java +++ b/src/main/java/org/bdware/doip/cluster/util/AutoCancelReconnectCallback.java @@ -10,7 +10,7 @@ import org.bdware.doip.endpoint.client.ResponseWait; import java.util.concurrent.TimeUnit; public class AutoCancelReconnectCallback implements ClientReadyCallback { - static Logger LOGGER = LogManager.getLogger(AutoCancelReconnectCallback.class); +// static Logger LOGGER = LogManager.getLogger(AutoCancelReconnectCallback.class); private final ClientReadyCallback originalCallback; Timeout timeout; @@ -28,12 +28,11 @@ public class AutoCancelReconnectCallback implements ClientReadyCallback { @Override public void onReady(AuditDoipClient result) { + // Already timeout is var timeout is null! if (timeout != null) { timeout.cancel(); if (originalCallback != null) originalCallback.onReady(result); - } else { - //Already timeout! } } } diff --git a/src/main/java/org/bdware/doip/cluster/util/AutoCancelRouteResultCallback.java b/src/main/java/org/bdware/doip/cluster/util/AutoCancelRouteResultCallback.java index d2bd49364f503b523b908458dfa6a1eafb33a6ad..03d99727b704654684e57013bbb0b0e8f0d63d4f 100644 --- a/src/main/java/org/bdware/doip/cluster/util/AutoCancelRouteResultCallback.java +++ b/src/main/java/org/bdware/doip/cluster/util/AutoCancelRouteResultCallback.java @@ -10,7 +10,7 @@ import org.bdware.doip.endpoint.client.ResponseWait; import java.util.concurrent.TimeUnit; public class AutoCancelRouteResultCallback implements RouteResultCallback { - static Logger LOGGER = LogManager.getLogger(AutoCancelRouteResultCallback.class); +// static Logger LOGGER = LogManager.getLogger(AutoCancelRouteResultCallback.class); private final RouteResultCallback originalCallback; Timeout timeout; @@ -27,11 +27,11 @@ public class AutoCancelRouteResultCallback implements RouteResultCallback @Override public void onResult(T result) { + // Already timeout is var timeout is null! if (timeout != null) { timeout.cancel(); originalCallback.onResult(result); - } else { - //Already timeout! - } + } //Already timeout! + } } diff --git a/src/main/java/org/bdware/doip/cluster/util/ClientReadyCallback.java b/src/main/java/org/bdware/doip/cluster/util/ClientReadyCallback.java index 807a77624ac2495ca46ca85e4d13e3fb66ff176c..c36151ada0c47a3e6443fc836ca5c90d4f9d47f5 100644 --- a/src/main/java/org/bdware/doip/cluster/util/ClientReadyCallback.java +++ b/src/main/java/org/bdware/doip/cluster/util/ClientReadyCallback.java @@ -3,5 +3,5 @@ package org.bdware.doip.cluster.util; import org.bdware.doip.audit.client.AuditDoipClient; public interface ClientReadyCallback { - public void onReady(AuditDoipClient client); + void onReady(AuditDoipClient client); } diff --git a/src/main/java/org/bdware/doip/cluster/util/ConnectionUtil.java b/src/main/java/org/bdware/doip/cluster/util/ConnectionUtil.java index 499317d195cac6c05db6e7cf07812a40be203193..f698bc580aa29dec069a9868243ecb86688a32b7 100644 --- a/src/main/java/org/bdware/doip/cluster/util/ConnectionUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/ConnectionUtil.java @@ -6,9 +6,7 @@ import org.bdware.doip.audit.client.AuditDoipClient; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.Map; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; /** * @author liuyihao @@ -22,11 +20,6 @@ public class ConnectionUtil { return t; }); - private final static Map reconnectRequests = new ConcurrentHashMap<>(); - private final static ConcurrentSkipListSet inConnectingAddress = new ConcurrentSkipListSet<>(); - - private final static Map lastReconnectDate = new ConcurrentHashMap<>(); - public static void tryReconnectSync(AuditDoipClient doipClientImpl, long timeToWait) { CompletableFuture future = CompletableFuture.supplyAsync(() -> { try { @@ -69,12 +62,10 @@ public class ConnectionUtil { doipClientImpl.reconnect(); LOGGER.info("=========ACTUALLY SUCCESS:" + doipClientImpl.getRepoUrl() + " tid:" + Thread.currentThread().getId()); } - return; } catch (Exception e) { ByteArrayOutputStream bo = new ByteArrayOutputStream(); e.printStackTrace(new PrintStream(bo)); LOGGER.info("failed to connect to:" + doipClientImpl.getRepoUrl() + " " + bo); - return; } finally { doipClientImpl.rwLock.release(); autoCancelReconnectCallback.onReady(doipClientImpl); diff --git a/src/main/java/org/bdware/doip/cluster/util/DOResolutionUtil.java b/src/main/java/org/bdware/doip/cluster/util/DOResolutionUtil.java index 7681039cc2077f9fcdd1c8804a0111f3a9639a49..fefeabe0fc25b2272ed6fa06eaf173d5183c5425 100644 --- a/src/main/java/org/bdware/doip/cluster/util/DOResolutionUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/DOResolutionUtil.java @@ -133,7 +133,7 @@ public class DOResolutionUtil { return null; } - public static BDWType getDOType(JsonObject doInfo) throws IrpClientException { + public static BDWType getDOType(JsonObject doInfo) { if (doInfo.get("bdwType") == null || doInfo.get("bdwType").isJsonNull()) { return BDWType.DO; } diff --git a/src/main/java/org/bdware/doip/cluster/util/IRSUtil.java b/src/main/java/org/bdware/doip/cluster/util/IRSUtil.java index 9096d1e42dc396a76704fa87bfa71bd6de748a2d..b892cf43b5be3c141060641b5799fac5e36d2711 100644 --- a/src/main/java/org/bdware/doip/cluster/util/IRSUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/IRSUtil.java @@ -14,7 +14,7 @@ public class IRSUtil { if (!currentAppendixes.has(targetDOID) || currentAppendixes.get(targetDOID).isJsonNull() || !currentAppendixes.get(targetDOID).isJsonObject()) { - JsonObject targetHandleValues = null; + JsonObject targetHandleValues; try { targetHandleValues = DOResolutionUtil.getDOInfo(irsClient, targetDOID); } catch (IrpClientException e) { diff --git a/src/main/java/org/bdware/doip/cluster/util/JSONTool.java b/src/main/java/org/bdware/doip/cluster/util/JSONTool.java index 952c30d0734a6695672adbccc2919840e291c103..bb1f4598201ebe10304f6cb541a86584283784fc 100644 --- a/src/main/java/org/bdware/doip/cluster/util/JSONTool.java +++ b/src/main/java/org/bdware/doip/cluster/util/JSONTool.java @@ -58,24 +58,24 @@ public class JSONTool { Object[] arr = (Object[]) obj; recorded.add(obj); JsonArray jsonArray = new JsonArray(); - for (int i = 0; i < arr.length; i++) { - jsonArray.add(convertMirrorToJsonInternal(arr[i], recorded)); + for (Object o : arr) { + jsonArray.add(convertMirrorToJsonInternal(o, recorded)); } return jsonArray; } else if (List.class.isAssignableFrom(obj.getClass())) { List arr = (List) obj; recorded.add(arr); JsonArray jsonArray = new JsonArray(); - for (int i = 0; i < arr.size(); i++) { - jsonArray.add(convertMirrorToJsonInternal(arr.get(i), recorded)); + for (Object o : arr) { + jsonArray.add(convertMirrorToJsonInternal(o, recorded)); } return jsonArray; } else if (List.class.isAssignableFrom(obj.getClass())) { List arr = (List) obj; recorded.add(arr); JsonArray jsonArray = new JsonArray(); - for (int i = 0; i < arr.size(); i++) { - jsonArray.add(convertMirrorToJsonInternal(arr.get(i), recorded)); + for (Object o : arr) { + jsonArray.add(convertMirrorToJsonInternal(o, recorded)); } return jsonArray; } else if (Set.class.isAssignableFrom(obj.getClass())) { diff --git a/src/main/java/org/bdware/doip/cluster/util/ResultMergerUtil.java b/src/main/java/org/bdware/doip/cluster/util/ResultMergerUtil.java deleted file mode 100644 index bfd715efcfd71aa12eca101bcd7b1fed6b7f9a38..0000000000000000000000000000000000000000 --- a/src/main/java/org/bdware/doip/cluster/util/ResultMergerUtil.java +++ /dev/null @@ -1,152 +0,0 @@ -package org.bdware.doip.cluster.util; - -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.doip.codec.doipMessage.DoipMessage; -import org.bdware.doip.endpoint.client.DoipMessageCallback; -import org.bdware.sc.ContractResult; -import org.bdware.sc.bean.JoinInfo; -import org.bdware.sc.util.JsonUtil; -import wrp.jdk.nashorn.api.scripting.NashornScriptEngineUtil; - -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -public class ResultMergerUtil implements DoipMessageCallback { - CompositeDoipResult compositeDoipResult; - AtomicInteger order; - int totalCount; // 记录有多少个节点 - DoipMessageCallback originalCallback; - JoinInfo joinInfo; - static Logger Logger = LogManager.getLogger(ResultMergerUtil.class); - int THRESHOLD; // >=的结果可返回 - NashornScriptEngineUtil engineUtil; - - public ResultMergerUtil(DoipMessageCallback originalCb, int totalCount, final JoinInfo joinInfo, NashornScriptEngineUtil nashornScriptEngineUtil) { - originalCallback = originalCb; - this.totalCount = totalCount; - compositeDoipResult = new CompositeDoipResult(totalCount); - order = new AtomicInteger(0); - this.joinInfo = joinInfo; - this.engineUtil = nashornScriptEngineUtil; - } - - public String getInfo() { - return " 收到第 " + order.get() + " 个节点回复 : "; - } - - @Override - public void onResult(DoipMessage doipMessage) { - // TODO 必须在这里聚合。 - // 返回值 - try { - Logger.info(getInfo() + doipMessage); - - //LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " order=" + order + " count=" + count); - final int curCount = order.incrementAndGet(); - compositeDoipResult.addDoipResult(curCount, doipMessage); - - // 收集到所有结果 - if (curCount == totalCount) { - // todo merge后的结果很奇怪,ContractResult是可以的,但是DOIPMessage不知道怎么merge合适啊。。。 - ContractResult finalResult = compositeDoipResult.mergeFinalResult(); - - if (joinInfo != null) { - handleJoinInfo(finalResult, joinInfo); - } - // new return result - DoipMessage doipMsgResult = new DoipMessage("", ""); - doipMsgResult.body.encodedData = finalResult.result.toString().getBytes(StandardCharsets.UTF_8); - originalCallback.onResult(doipMsgResult); - - // old return result - // originalCallback.onResult(JsonUtil.fromJson(finalResult.result.getAsJsonObject(), DoipMessage.class)); - - // recover,其中无状态合约CP出错无需恢复 - Set problemResults = compositeDoipResult.getProblemNodes(); - for (String problemResult : problemResults) { - Logger.warn(problemResult); - } - } - } catch (Exception e) { - e.printStackTrace(); - Logger.info("本次执行最终结果为有异常"); - } - } - - private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) { - JsonObject jo = finalResult.result.getAsJsonObject(); - try { - if (joinInfo != null) { - if (joinInfo.useDefault == null) { - if (joinInfo.joinFuncName != null) { - finalResult.result = engineUtil.invokeFunction(joinInfo.joinFuncName, JsonElement.class, jo); - } - return; - } - switch (joinInfo.useDefault) { - case add: - double val = 0; - for (String key : jo.keySet()) { - val += jo.get(key).getAsDouble(); - } - finalResult.result = new JsonPrimitive(val); - break; - case multiply: - val = 1; - for (String key : jo.keySet()) { - val *= jo.get(key).getAsDouble(); - } - finalResult.result = new JsonPrimitive(val); - break; - default: - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } -} - -class CompositeDoipResult { - private static final Logger LOGGER = LogManager.getLogger(CompositeDoipResult.class); - Map successResultMap = new HashMap<>(); // 只有状态是Success的返回结果会存在这个map中 - Set problemResult = new HashSet<>(); - int THRESHOLD; // >=的结果可返回 - - public CompositeDoipResult(int totalCount) { - THRESHOLD = (int) Math.ceil((double) totalCount / 2); - } - - public synchronized void addDoipResult(final int count, DoipMessage doipMessage) { - successResultMap.put(Integer.valueOf(count).toString(), doipMessage); - } - - public Set getProblemNodes() { - return problemResult; - } - - public ContractResult mergeFinalResult() { - LOGGER.info("mergeFinalResult"); - - // JsonParser - JsonObject finalResult = new JsonObject(); - ContractResult finalContractResult = new ContractResult(ContractResult.Status.Success, finalResult); - for (String count : successResultMap.keySet()) { - // 取出 - DoipMessage singleResult = successResultMap.get(count); // 取出结果 - if (singleResult != null) { - finalResult.add(count, JsonUtil.fromJson(JsonUtil.toJson(singleResult), JsonObject.class)); - } - } - - return finalContractResult; - } -} \ No newline at end of file diff --git a/src/main/java/org/bdware/doip/cluster/util/RouteJoinUtil.java b/src/main/java/org/bdware/doip/cluster/util/RouteJoinUtil.java index fedf16af4fe0ef82273a50d6353c33a2d800eedd..1541315a113ea1d7e4b4ed3a64e09fd25c52f476 100644 --- a/src/main/java/org/bdware/doip/cluster/util/RouteJoinUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/RouteJoinUtil.java @@ -10,10 +10,9 @@ import org.bdware.doip.audit.EndpointConfig; import org.bdware.doip.audit.client.AuditDoipClient; import org.bdware.doip.cluster.callback.BDODelegateDoipMessageCallback; import org.bdware.doip.cluster.callback.DDOClusterDoipMessageCallback; +import org.bdware.doip.cluster.callback.RouteResultCallback; import org.bdware.doip.cluster.client.DoaClusterClient; import org.bdware.doip.cluster.client.DoipClusterClient; -import org.bdware.doip.cluster.callback.RouteResultCallback; -import org.bdware.doip.codec.JsonDoipMessage; import org.bdware.doip.codec.doipMessage.DoipMessage; import org.bdware.doip.codec.doipMessage.DoipMessageSigner; import org.bdware.doip.endpoint.client.DoipMessageCallback; @@ -75,7 +74,6 @@ public class RouteJoinUtil { e.printStackTrace(); } routeResultCallback.onResult(null); - return; } public static JsonElement tryLoadJsonProp(JsonObject routeInfoArg, String param) { diff --git a/src/test/java/org/bdware/doip/cluster/DoaClusterClientTest.java b/src/test/java/org/bdware/doip/cluster/DoaClusterClientTest.java index 66ea55b2eef6271d2c91a5925fe4fc7a232ab5a8..512a1990ae12773e318685f13544fd3f7fa8b3bc 100644 --- a/src/test/java/org/bdware/doip/cluster/DoaClusterClientTest.java +++ b/src/test/java/org/bdware/doip/cluster/DoaClusterClientTest.java @@ -38,7 +38,7 @@ public class DoaClusterClientTest { EndpointConfig config = new EndpointConfig(); config.routerURI = "tcp://8.130.41.205:21041"; DoaClusterClient client = new DoaClusterClient(config); - int totalCount = 500000; + int totalCount = 10; AtomicInteger count = new AtomicInteger(0); AtomicInteger correct = new AtomicInteger(0); printAsync(correct, count, totalCount);