From 1a7962b34b5f2ad8d038ab51c926473e86777194 Mon Sep 17 00:00:00 2001 From: "haoeliu@foxmail.com" Date: Wed, 26 Jul 2023 21:24:06 +0800 Subject: [PATCH 1/2] optimize code --- .../doip/cluster/client/DoaClusterClient.java | 36 ++--- .../util/AutoCancelReconnectCallback.java | 5 +- .../util/AutoCancelRouteResultCallback.java | 8 +- .../cluster/util/ClientReadyCallback.java | 2 +- .../doip/cluster/util/ConnectionUtil.java | 9 -- .../doip/cluster/util/DOResolutionUtil.java | 2 +- .../org/bdware/doip/cluster/util/IRSUtil.java | 2 +- .../bdware/doip/cluster/util/JSONTool.java | 12 +- .../doip/cluster/util/ResultMergerUtil.java | 152 ------------------ .../doip/cluster/util/RouteJoinUtil.java | 4 +- .../doip/cluster/DoaClusterClientTest.java | 2 +- 11 files changed, 28 insertions(+), 206 deletions(-) delete mode 100644 src/main/java/org/bdware/doip/cluster/util/ResultMergerUtil.java diff --git a/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java b/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java index 4ddae57..1a5923c 100644 --- a/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java +++ b/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java @@ -20,8 +20,6 @@ import org.bdware.doip.endpoint.client.DoipMessageCallback; import org.bdware.irp.exception.IrpClientException; import org.zz.gmhelper.SM2KeyPair; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.*; @@ -83,22 +81,13 @@ public class DoaClusterClient extends DoipClientImpl { } public DoipMessage sendMessageSync(DoipMessage doipMessage, long timeout, boolean needFlowControl) { - CountDownLatch countDownLatch = new CountDownLatch(1); - List result = new ArrayList<>(); - DoipMessageCallback callback = new DoipMessageCallback() { - @Override - public void onResult(DoipMessage msg) { - result.add(msg); - countDownLatch.countDown(); - } - }; + CompletableFuture result = new CompletableFuture<>(); + DoipMessageCallback callback = result::complete; sendMessage(doipMessage, callback, needFlowControl); + try { - countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - if (result.size() > 0) return result.get(0); - else { + return result.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); builder.createResponse(DoipResponseCode.MoreThanOneErrors, doipMessage); builder.setBody("timeout".getBytes()); @@ -116,15 +105,12 @@ public class DoaClusterClient extends DoipClientImpl { if (cachedDO == null) { JsonObject doInfo = DOResolutionUtil.getDOInfo(irsClient, doid); BDWType doType = DOResolutionUtil.getDOType(doInfo); - switch (doType) { - case DDO: - DOResolutionUtil.verifyDDOInfo(doInfo); - cachedDO = ddoInfo2DDOEntity(doid, doInfo); - break; - default: - DOResolutionUtil.verifyBDOInfo(doInfo); - cachedDO = bdoInfo2BDOEntity(doid, doInfo); - break; + if (doType == BDWType.DDO) { + DOResolutionUtil.verifyDDOInfo(doInfo); + cachedDO = ddoInfo2DDOEntity(doid, doInfo); + } else { + DOResolutionUtil.verifyBDOInfo(doInfo); + cachedDO = bdoInfo2BDOEntity(doid, doInfo); } } DoipMessageCallback flowControlCallback = cb; diff --git a/src/main/java/org/bdware/doip/cluster/util/AutoCancelReconnectCallback.java b/src/main/java/org/bdware/doip/cluster/util/AutoCancelReconnectCallback.java index a8e3191..29442f4 100644 --- a/src/main/java/org/bdware/doip/cluster/util/AutoCancelReconnectCallback.java +++ b/src/main/java/org/bdware/doip/cluster/util/AutoCancelReconnectCallback.java @@ -10,7 +10,7 @@ import org.bdware.doip.endpoint.client.ResponseWait; import java.util.concurrent.TimeUnit; public class AutoCancelReconnectCallback implements ClientReadyCallback { - static Logger LOGGER = LogManager.getLogger(AutoCancelReconnectCallback.class); +// static Logger LOGGER = LogManager.getLogger(AutoCancelReconnectCallback.class); private final ClientReadyCallback originalCallback; Timeout timeout; @@ -28,12 +28,11 @@ public class AutoCancelReconnectCallback implements ClientReadyCallback { @Override public void onReady(AuditDoipClient result) { + // Already timeout is var timeout is null! if (timeout != null) { timeout.cancel(); if (originalCallback != null) originalCallback.onReady(result); - } else { - //Already timeout! } } } diff --git a/src/main/java/org/bdware/doip/cluster/util/AutoCancelRouteResultCallback.java b/src/main/java/org/bdware/doip/cluster/util/AutoCancelRouteResultCallback.java index d2bd493..03d9972 100644 --- a/src/main/java/org/bdware/doip/cluster/util/AutoCancelRouteResultCallback.java +++ b/src/main/java/org/bdware/doip/cluster/util/AutoCancelRouteResultCallback.java @@ -10,7 +10,7 @@ import org.bdware.doip.endpoint.client.ResponseWait; import java.util.concurrent.TimeUnit; public class AutoCancelRouteResultCallback implements RouteResultCallback { - static Logger LOGGER = LogManager.getLogger(AutoCancelRouteResultCallback.class); +// static Logger LOGGER = LogManager.getLogger(AutoCancelRouteResultCallback.class); private final RouteResultCallback originalCallback; Timeout timeout; @@ -27,11 +27,11 @@ public class AutoCancelRouteResultCallback implements RouteResultCallback @Override public void onResult(T result) { + // Already timeout is var timeout is null! if (timeout != null) { timeout.cancel(); originalCallback.onResult(result); - } else { - //Already timeout! - } + } //Already timeout! + } } diff --git a/src/main/java/org/bdware/doip/cluster/util/ClientReadyCallback.java b/src/main/java/org/bdware/doip/cluster/util/ClientReadyCallback.java index 807a776..c36151a 100644 --- a/src/main/java/org/bdware/doip/cluster/util/ClientReadyCallback.java +++ b/src/main/java/org/bdware/doip/cluster/util/ClientReadyCallback.java @@ -3,5 +3,5 @@ package org.bdware.doip.cluster.util; import org.bdware.doip.audit.client.AuditDoipClient; public interface ClientReadyCallback { - public void onReady(AuditDoipClient client); + void onReady(AuditDoipClient client); } diff --git a/src/main/java/org/bdware/doip/cluster/util/ConnectionUtil.java b/src/main/java/org/bdware/doip/cluster/util/ConnectionUtil.java index 499317d..f698bc5 100644 --- a/src/main/java/org/bdware/doip/cluster/util/ConnectionUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/ConnectionUtil.java @@ -6,9 +6,7 @@ import org.bdware.doip.audit.client.AuditDoipClient; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.Map; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; /** * @author liuyihao @@ -22,11 +20,6 @@ public class ConnectionUtil { return t; }); - private final static Map reconnectRequests = new ConcurrentHashMap<>(); - private final static ConcurrentSkipListSet inConnectingAddress = new ConcurrentSkipListSet<>(); - - private final static Map lastReconnectDate = new ConcurrentHashMap<>(); - public static void tryReconnectSync(AuditDoipClient doipClientImpl, long timeToWait) { CompletableFuture future = CompletableFuture.supplyAsync(() -> { try { @@ -69,12 +62,10 @@ public class ConnectionUtil { doipClientImpl.reconnect(); LOGGER.info("=========ACTUALLY SUCCESS:" + doipClientImpl.getRepoUrl() + " tid:" + Thread.currentThread().getId()); } - return; } catch (Exception e) { ByteArrayOutputStream bo = new ByteArrayOutputStream(); e.printStackTrace(new PrintStream(bo)); LOGGER.info("failed to connect to:" + doipClientImpl.getRepoUrl() + " " + bo); - return; } finally { doipClientImpl.rwLock.release(); autoCancelReconnectCallback.onReady(doipClientImpl); diff --git a/src/main/java/org/bdware/doip/cluster/util/DOResolutionUtil.java b/src/main/java/org/bdware/doip/cluster/util/DOResolutionUtil.java index 7681039..fefeabe 100644 --- a/src/main/java/org/bdware/doip/cluster/util/DOResolutionUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/DOResolutionUtil.java @@ -133,7 +133,7 @@ public class DOResolutionUtil { return null; } - public static BDWType getDOType(JsonObject doInfo) throws IrpClientException { + public static BDWType getDOType(JsonObject doInfo) { if (doInfo.get("bdwType") == null || doInfo.get("bdwType").isJsonNull()) { return BDWType.DO; } diff --git a/src/main/java/org/bdware/doip/cluster/util/IRSUtil.java b/src/main/java/org/bdware/doip/cluster/util/IRSUtil.java index 9096d1e..b892cf4 100644 --- a/src/main/java/org/bdware/doip/cluster/util/IRSUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/IRSUtil.java @@ -14,7 +14,7 @@ public class IRSUtil { if (!currentAppendixes.has(targetDOID) || currentAppendixes.get(targetDOID).isJsonNull() || !currentAppendixes.get(targetDOID).isJsonObject()) { - JsonObject targetHandleValues = null; + JsonObject targetHandleValues; try { targetHandleValues = DOResolutionUtil.getDOInfo(irsClient, targetDOID); } catch (IrpClientException e) { diff --git a/src/main/java/org/bdware/doip/cluster/util/JSONTool.java b/src/main/java/org/bdware/doip/cluster/util/JSONTool.java index 952c30d..bb1f459 100644 --- a/src/main/java/org/bdware/doip/cluster/util/JSONTool.java +++ b/src/main/java/org/bdware/doip/cluster/util/JSONTool.java @@ -58,24 +58,24 @@ public class JSONTool { Object[] arr = (Object[]) obj; recorded.add(obj); JsonArray jsonArray = new JsonArray(); - for (int i = 0; i < arr.length; i++) { - jsonArray.add(convertMirrorToJsonInternal(arr[i], recorded)); + for (Object o : arr) { + jsonArray.add(convertMirrorToJsonInternal(o, recorded)); } return jsonArray; } else if (List.class.isAssignableFrom(obj.getClass())) { List arr = (List) obj; recorded.add(arr); JsonArray jsonArray = new JsonArray(); - for (int i = 0; i < arr.size(); i++) { - jsonArray.add(convertMirrorToJsonInternal(arr.get(i), recorded)); + for (Object o : arr) { + jsonArray.add(convertMirrorToJsonInternal(o, recorded)); } return jsonArray; } else if (List.class.isAssignableFrom(obj.getClass())) { List arr = (List) obj; recorded.add(arr); JsonArray jsonArray = new JsonArray(); - for (int i = 0; i < arr.size(); i++) { - jsonArray.add(convertMirrorToJsonInternal(arr.get(i), recorded)); + for (Object o : arr) { + jsonArray.add(convertMirrorToJsonInternal(o, recorded)); } return jsonArray; } else if (Set.class.isAssignableFrom(obj.getClass())) { diff --git a/src/main/java/org/bdware/doip/cluster/util/ResultMergerUtil.java b/src/main/java/org/bdware/doip/cluster/util/ResultMergerUtil.java deleted file mode 100644 index bfd715e..0000000 --- a/src/main/java/org/bdware/doip/cluster/util/ResultMergerUtil.java +++ /dev/null @@ -1,152 +0,0 @@ -package org.bdware.doip.cluster.util; - -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.bdware.doip.codec.doipMessage.DoipMessage; -import org.bdware.doip.endpoint.client.DoipMessageCallback; -import org.bdware.sc.ContractResult; -import org.bdware.sc.bean.JoinInfo; -import org.bdware.sc.util.JsonUtil; -import wrp.jdk.nashorn.api.scripting.NashornScriptEngineUtil; - -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -public class ResultMergerUtil implements DoipMessageCallback { - CompositeDoipResult compositeDoipResult; - AtomicInteger order; - int totalCount; // 记录有多少个节点 - DoipMessageCallback originalCallback; - JoinInfo joinInfo; - static Logger Logger = LogManager.getLogger(ResultMergerUtil.class); - int THRESHOLD; // >=的结果可返回 - NashornScriptEngineUtil engineUtil; - - public ResultMergerUtil(DoipMessageCallback originalCb, int totalCount, final JoinInfo joinInfo, NashornScriptEngineUtil nashornScriptEngineUtil) { - originalCallback = originalCb; - this.totalCount = totalCount; - compositeDoipResult = new CompositeDoipResult(totalCount); - order = new AtomicInteger(0); - this.joinInfo = joinInfo; - this.engineUtil = nashornScriptEngineUtil; - } - - public String getInfo() { - return " 收到第 " + order.get() + " 个节点回复 : "; - } - - @Override - public void onResult(DoipMessage doipMessage) { - // TODO 必须在这里聚合。 - // 返回值 - try { - Logger.info(getInfo() + doipMessage); - - //LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " order=" + order + " count=" + count); - final int curCount = order.incrementAndGet(); - compositeDoipResult.addDoipResult(curCount, doipMessage); - - // 收集到所有结果 - if (curCount == totalCount) { - // todo merge后的结果很奇怪,ContractResult是可以的,但是DOIPMessage不知道怎么merge合适啊。。。 - ContractResult finalResult = compositeDoipResult.mergeFinalResult(); - - if (joinInfo != null) { - handleJoinInfo(finalResult, joinInfo); - } - // new return result - DoipMessage doipMsgResult = new DoipMessage("", ""); - doipMsgResult.body.encodedData = finalResult.result.toString().getBytes(StandardCharsets.UTF_8); - originalCallback.onResult(doipMsgResult); - - // old return result - // originalCallback.onResult(JsonUtil.fromJson(finalResult.result.getAsJsonObject(), DoipMessage.class)); - - // recover,其中无状态合约CP出错无需恢复 - Set problemResults = compositeDoipResult.getProblemNodes(); - for (String problemResult : problemResults) { - Logger.warn(problemResult); - } - } - } catch (Exception e) { - e.printStackTrace(); - Logger.info("本次执行最终结果为有异常"); - } - } - - private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) { - JsonObject jo = finalResult.result.getAsJsonObject(); - try { - if (joinInfo != null) { - if (joinInfo.useDefault == null) { - if (joinInfo.joinFuncName != null) { - finalResult.result = engineUtil.invokeFunction(joinInfo.joinFuncName, JsonElement.class, jo); - } - return; - } - switch (joinInfo.useDefault) { - case add: - double val = 0; - for (String key : jo.keySet()) { - val += jo.get(key).getAsDouble(); - } - finalResult.result = new JsonPrimitive(val); - break; - case multiply: - val = 1; - for (String key : jo.keySet()) { - val *= jo.get(key).getAsDouble(); - } - finalResult.result = new JsonPrimitive(val); - break; - default: - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } -} - -class CompositeDoipResult { - private static final Logger LOGGER = LogManager.getLogger(CompositeDoipResult.class); - Map successResultMap = new HashMap<>(); // 只有状态是Success的返回结果会存在这个map中 - Set problemResult = new HashSet<>(); - int THRESHOLD; // >=的结果可返回 - - public CompositeDoipResult(int totalCount) { - THRESHOLD = (int) Math.ceil((double) totalCount / 2); - } - - public synchronized void addDoipResult(final int count, DoipMessage doipMessage) { - successResultMap.put(Integer.valueOf(count).toString(), doipMessage); - } - - public Set getProblemNodes() { - return problemResult; - } - - public ContractResult mergeFinalResult() { - LOGGER.info("mergeFinalResult"); - - // JsonParser - JsonObject finalResult = new JsonObject(); - ContractResult finalContractResult = new ContractResult(ContractResult.Status.Success, finalResult); - for (String count : successResultMap.keySet()) { - // 取出 - DoipMessage singleResult = successResultMap.get(count); // 取出结果 - if (singleResult != null) { - finalResult.add(count, JsonUtil.fromJson(JsonUtil.toJson(singleResult), JsonObject.class)); - } - } - - return finalContractResult; - } -} \ No newline at end of file diff --git a/src/main/java/org/bdware/doip/cluster/util/RouteJoinUtil.java b/src/main/java/org/bdware/doip/cluster/util/RouteJoinUtil.java index fedf16a..1541315 100644 --- a/src/main/java/org/bdware/doip/cluster/util/RouteJoinUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/RouteJoinUtil.java @@ -10,10 +10,9 @@ import org.bdware.doip.audit.EndpointConfig; import org.bdware.doip.audit.client.AuditDoipClient; import org.bdware.doip.cluster.callback.BDODelegateDoipMessageCallback; import org.bdware.doip.cluster.callback.DDOClusterDoipMessageCallback; +import org.bdware.doip.cluster.callback.RouteResultCallback; import org.bdware.doip.cluster.client.DoaClusterClient; import org.bdware.doip.cluster.client.DoipClusterClient; -import org.bdware.doip.cluster.callback.RouteResultCallback; -import org.bdware.doip.codec.JsonDoipMessage; import org.bdware.doip.codec.doipMessage.DoipMessage; import org.bdware.doip.codec.doipMessage.DoipMessageSigner; import org.bdware.doip.endpoint.client.DoipMessageCallback; @@ -75,7 +74,6 @@ public class RouteJoinUtil { e.printStackTrace(); } routeResultCallback.onResult(null); - return; } public static JsonElement tryLoadJsonProp(JsonObject routeInfoArg, String param) { diff --git a/src/test/java/org/bdware/doip/cluster/DoaClusterClientTest.java b/src/test/java/org/bdware/doip/cluster/DoaClusterClientTest.java index 66ea55b..512a199 100644 --- a/src/test/java/org/bdware/doip/cluster/DoaClusterClientTest.java +++ b/src/test/java/org/bdware/doip/cluster/DoaClusterClientTest.java @@ -38,7 +38,7 @@ public class DoaClusterClientTest { EndpointConfig config = new EndpointConfig(); config.routerURI = "tcp://8.130.41.205:21041"; DoaClusterClient client = new DoaClusterClient(config); - int totalCount = 500000; + int totalCount = 10; AtomicInteger count = new AtomicInteger(0); AtomicInteger correct = new AtomicInteger(0); printAsync(correct, count, totalCount); -- Gitee From 0e3bb1bd7a667be78db50d23aebeab0347e0adc5 Mon Sep 17 00:00:00 2001 From: "haoeliu@foxmail.com" Date: Wed, 26 Jul 2023 21:33:28 +0800 Subject: [PATCH 2/2] optimize code --- .../doip/cluster/client/DoaClusterClient.java | 11 ++++----- .../doip/cluster/engine/DigestUtil.java | 3 ++- .../doip/cluster/engine/DoaClientUtil.java | 3 +-- .../doip/cluster/engine/DoipMessageUtil.java | 23 ++++++++----------- .../doip/cluster/engine/IdentifierUtil.java | 1 - .../bdware/doip/cluster/engine/JavaUtil.java | 2 -- .../bdware/doip/cluster/entity/BDOEntity.java | 20 +++++----------- .../bdware/doip/cluster/entity/DDOEntity.java | 16 ++++++------- .../bdware/doip/cluster/entity/DOEntity.java | 2 +- .../{RequestPack.java => SendMsgReqPack.java} | 6 ++--- .../doip/cluster/flowcontrol/FlowControl.java | 4 ++-- .../util/AuditDoipClientCacheUtil.java | 2 +- 12 files changed, 38 insertions(+), 55 deletions(-) rename src/main/java/org/bdware/doip/cluster/entity/{RequestPack.java => SendMsgReqPack.java} (83%) diff --git a/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java b/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java index 1a5923c..cd38678 100644 --- a/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java +++ b/src/main/java/org/bdware/doip/cluster/client/DoaClusterClient.java @@ -34,7 +34,7 @@ public class DoaClusterClient extends DoipClientImpl { private static final Logger LOGGER = LogManager.getLogger(DoaClusterClient.class); private final FlowControl flowControl; // todo optimize thread pool - public static ExecutorService sendMessagePool = + public static ExecutorService taskPool = new ThreadPoolExecutor( 8, // corePoolSize (number of threads to keep in the pool) 16, // maximumPoolSize (maximum number of threads allowed in the pool) @@ -59,7 +59,6 @@ public class DoaClusterClient extends DoipClientImpl { if (routerConfig.extraConfig != null && routerConfig.extraConfig.has("rateThresholdFlowControl")) threshold = routerConfig.extraConfig.get("rateThresholdFlowControl").getAsInt(); flowControl = new RateThresholdFlowControl(threshold); - // new Thread(this::consumeSendMessageTask).start(); LOGGER.info("The SmartClusterClient has been initialized"); } @@ -119,8 +118,8 @@ public class DoaClusterClient extends DoipClientImpl { cb.onResult(msg); flowControl.maintainFlowControl(); }; - RequestPack requestPack = new RequestPack(this, cachedDO, doipMessage, flowControlCallback, needFlowControl); - produceSendMessageTask(requestPack); + SendMsgReqPack sendMsgReqPack = new SendMsgReqPack(this, cachedDO, doipMessage, flowControlCallback, needFlowControl); + executeTask(sendMsgReqPack); } public DDOEntity ddoInfo2DDOEntity(String ddoID, JsonObject ddoInfo) { @@ -139,7 +138,7 @@ public class DoaClusterClient extends DoipClientImpl { return (BDOEntity) doCache.get(bdoID); } - private void produceSendMessageTask(RequestPack pack) { + private void executeTask(SendMsgReqPack pack) { if (pack.needFlowControl()) for (int i = 0; i < 10 && !flowControl.enableRequestPass(); i++) { try { @@ -149,7 +148,7 @@ public class DoaClusterClient extends DoipClientImpl { e.printStackTrace(); } } - sendMessagePool.execute(pack); + taskPool.execute(pack); } public void signMessage(DoipMessage message) { diff --git a/src/main/java/org/bdware/doip/cluster/engine/DigestUtil.java b/src/main/java/org/bdware/doip/cluster/engine/DigestUtil.java index f720156..1a7516b 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/DigestUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/DigestUtil.java @@ -49,7 +49,7 @@ public class DigestUtil { int index = 0; while (treeNode.level != 0) { double hash = md5Double(address + treeNode.label); - if (hash < (double) treeNode.leftChild.weight / treeNode.weight) { + if (hash < treeNode.leftChild.weight / treeNode.weight) { treeNode = treeNode.leftChild; } else { treeNode = treeNode.rightChild; @@ -79,6 +79,7 @@ public class DigestUtil { label.append('1'); for (int i = 0; i < level; ++i) label.append('0'); + assert leftTree != null && rightTree != null; TreeNode thisNode = new TreeNode(label.toString(), leftTree.weight + rightTree.weight, level, leftTree.leafCount + rightTree.leafCount); thisNode.leftChild = leftTree; diff --git a/src/main/java/org/bdware/doip/cluster/engine/DoaClientUtil.java b/src/main/java/org/bdware/doip/cluster/engine/DoaClientUtil.java index 78f097a..f01f5d4 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/DoaClientUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/DoaClientUtil.java @@ -24,8 +24,7 @@ public class DoaClientUtil { } public DoipMessage sendMessageSync(DoipMessage doipMessage) { - DoipMessage result = client.sendMessageSync(doipMessage, 10000, false); - return result; + return client.sendMessageSync(doipMessage, 10000, false); } public void sendMessage(DoipMessage arg, DoipMessageCallback callback) { client.sendMessage(arg, callback, false); diff --git a/src/main/java/org/bdware/doip/cluster/engine/DoipMessageUtil.java b/src/main/java/org/bdware/doip/cluster/engine/DoipMessageUtil.java index 8e34794..2a29388 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/DoipMessageUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/DoipMessageUtil.java @@ -44,20 +44,17 @@ public class DoipMessageUtil { } public DoipMessageCallback wrapCallback(ScriptFunction som, Object... appendArgs) { - return new DoipMessageCallback() { - @Override - public void onResult(DoipMessage result) { - Object[] newArr; - if (appendArgs != null && appendArgs.length > 0) { - newArr = new Object[appendArgs.length + 1]; - newArr[0] = result; - System.arraycopy(appendArgs, 0, newArr, 1, appendArgs.length); - } else { - newArr = new Object[1]; - newArr[0] = result; - } - engine.invokeFunctionWithObject(som, Object.class, newArr); + return result -> { + Object[] newArr; + if (appendArgs != null && appendArgs.length > 0) { + newArr = new Object[appendArgs.length + 1]; + newArr[0] = result; + System.arraycopy(appendArgs, 0, newArr, 1, appendArgs.length); + } else { + newArr = new Object[1]; + newArr[0] = result; } + engine.invokeFunctionWithObject(som, Object.class, newArr); }; } public DoipResponseCode getResponseCode(String respCode){ diff --git a/src/main/java/org/bdware/doip/cluster/engine/IdentifierUtil.java b/src/main/java/org/bdware/doip/cluster/engine/IdentifierUtil.java index d253aae..1c68bbc 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/IdentifierUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/IdentifierUtil.java @@ -1,6 +1,5 @@ package org.bdware.doip.cluster.engine; -import com.google.gson.JsonArray; import org.bdware.doip.cluster.entity.DDOEntity; import java.util.List; diff --git a/src/main/java/org/bdware/doip/cluster/engine/JavaUtil.java b/src/main/java/org/bdware/doip/cluster/engine/JavaUtil.java index 99c2cfb..94956a7 100644 --- a/src/main/java/org/bdware/doip/cluster/engine/JavaUtil.java +++ b/src/main/java/org/bdware/doip/cluster/engine/JavaUtil.java @@ -1,7 +1,5 @@ package org.bdware.doip.cluster.engine; -import com.google.gson.JsonElement; -import com.google.gson.JsonPrimitive; import jdk.nashorn.api.scripting.ScriptObjectMirror; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/src/main/java/org/bdware/doip/cluster/entity/BDOEntity.java b/src/main/java/org/bdware/doip/cluster/entity/BDOEntity.java index b31b379..678273c 100644 --- a/src/main/java/org/bdware/doip/cluster/entity/BDOEntity.java +++ b/src/main/java/org/bdware/doip/cluster/entity/BDOEntity.java @@ -3,19 +3,16 @@ package org.bdware.doip.cluster.entity; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.doip.audit.EndpointConfig; -import org.bdware.doip.audit.client.AuditDoipClient; import org.bdware.doip.cluster.callback.BDODelegateDoipMessageCallback; import org.bdware.doip.cluster.client.DoaClusterClient; -import org.bdware.doip.cluster.flowcontrol.core.RateThresholdFlowControl; import org.bdware.doip.cluster.util.AuditDoipClientCacheUtil; -import org.bdware.doip.cluster.util.ClientReadyCallback; import org.bdware.doip.codec.doipMessage.DoipMessage; import org.bdware.doip.endpoint.client.DoipMessageCallback; public class BDOEntity implements DOEntity { static Logger LOGGER = LogManager.getLogger(BDOEntity.class); - private String address; - private String version; + private final String address; + private final String version; public BDOEntity(String address, String version) { this.address = address; @@ -37,17 +34,12 @@ public class BDOEntity implements DOEntity { } - RateThresholdFlowControl flowControl = new RateThresholdFlowControl(200); - @Override public void sendMessage(DoaClusterClient client, DoipMessage doipMessage, DoipMessageCallback doipMessageCallback, EndpointConfig routerConfig) { - AuditDoipClientCacheUtil.getClientByUrl(address, version, new ClientReadyCallback() { - @Override - public void onReady(AuditDoipClient doipClient) { - client.signMessage(doipMessage); - BDODelegateDoipMessageCallback BDODelegateDoipMessageCallback = new BDODelegateDoipMessageCallback(doipClient, doipMessageCallback, doipMessage); - doipClient.sendMessage(doipMessage, BDODelegateDoipMessageCallback); - } + AuditDoipClientCacheUtil.getClientByUrl(address, version, doipClient -> { + client.signMessage(doipMessage); + BDODelegateDoipMessageCallback BDODelegateDoipMessageCallback = new BDODelegateDoipMessageCallback(doipClient, doipMessageCallback, doipMessage); + doipClient.sendMessage(doipMessage, BDODelegateDoipMessageCallback); }); } diff --git a/src/main/java/org/bdware/doip/cluster/entity/DDOEntity.java b/src/main/java/org/bdware/doip/cluster/entity/DDOEntity.java index 2cfc16c..33a2f94 100644 --- a/src/main/java/org/bdware/doip/cluster/entity/DDOEntity.java +++ b/src/main/java/org/bdware/doip/cluster/entity/DDOEntity.java @@ -24,18 +24,16 @@ import java.util.Map; public class DDOEntity implements DOEntity { // hello -> routeInfo - private Map ddoRouteInfo; + private final Map ddoRouteInfo; // hello -> joinInfo - private Map ddoJoinInfo; - private String[] bdoList; - private JsonObject bcoAccessRules; - private NashornScriptEngineUtil engineUtil; - private JsonObject appendixes; - private JsonObject ddoInfo; - private DoaClusterClient client; + private final Map ddoJoinInfo; + private final String[] bdoList; + private final JsonObject bcoAccessRules; + private final NashornScriptEngineUtil engineUtil; + private final JsonObject appendixes; + private final DoaClusterClient client; public DDOEntity(JsonObject ddoInfo, DoaClusterClient client) { - this.ddoInfo = ddoInfo; String bcoId = ddoInfo.get("bcoId").getAsString(); appendixes = ddoInfo.get("appendixes").getAsJsonObject(); bcoAccessRules = appendixes.get(bcoId).getAsJsonObject().get("accessRules").getAsJsonObject(); diff --git a/src/main/java/org/bdware/doip/cluster/entity/DOEntity.java b/src/main/java/org/bdware/doip/cluster/entity/DOEntity.java index b5ef7e9..13bf79a 100644 --- a/src/main/java/org/bdware/doip/cluster/entity/DOEntity.java +++ b/src/main/java/org/bdware/doip/cluster/entity/DOEntity.java @@ -6,7 +6,7 @@ import org.bdware.doip.codec.doipMessage.DoipMessage; import org.bdware.doip.endpoint.client.DoipMessageCallback; public interface DOEntity { - public BDWType getType(); + BDWType getType(); void sendMessage(DoaClusterClient client, DoipMessage doipMessage, DoipMessageCallback doipMessageCallback, EndpointConfig routerConfig); } diff --git a/src/main/java/org/bdware/doip/cluster/entity/RequestPack.java b/src/main/java/org/bdware/doip/cluster/entity/SendMsgReqPack.java similarity index 83% rename from src/main/java/org/bdware/doip/cluster/entity/RequestPack.java rename to src/main/java/org/bdware/doip/cluster/entity/SendMsgReqPack.java index e320192..391a9e5 100644 --- a/src/main/java/org/bdware/doip/cluster/entity/RequestPack.java +++ b/src/main/java/org/bdware/doip/cluster/entity/SendMsgReqPack.java @@ -6,8 +6,8 @@ import org.bdware.doip.cluster.client.DoaClusterClient; import org.bdware.doip.codec.doipMessage.DoipMessage; import org.bdware.doip.endpoint.client.DoipMessageCallback; -public class RequestPack extends RequestPackIntf { - static Logger LOGGER = LogManager.getLogger(RequestPack.class); +public class SendMsgReqPack extends RequestPackIntf { + static Logger LOGGER = LogManager.getLogger(SendMsgReqPack.class); private final long createTime; DOEntity doEntity; DoipMessage doipMessage; @@ -15,7 +15,7 @@ public class RequestPack extends RequestPackIntf { boolean needFlowControl; DoaClusterClient doaClusterClient; - public RequestPack(DoaClusterClient doaClusterClient, DOEntity cachedDO, DoipMessage message, DoipMessageCallback callback, boolean needFlowControl) { + public SendMsgReqPack(DoaClusterClient doaClusterClient, DOEntity cachedDO, DoipMessage message, DoipMessageCallback callback, boolean needFlowControl) { this.doaClusterClient = doaClusterClient; this.doEntity = cachedDO; this.doipMessage = message; diff --git a/src/main/java/org/bdware/doip/cluster/flowcontrol/FlowControl.java b/src/main/java/org/bdware/doip/cluster/flowcontrol/FlowControl.java index d2e3415..fef7937 100644 --- a/src/main/java/org/bdware/doip/cluster/flowcontrol/FlowControl.java +++ b/src/main/java/org/bdware/doip/cluster/flowcontrol/FlowControl.java @@ -1,7 +1,7 @@ package org.bdware.doip.cluster.flowcontrol; public interface FlowControl { - public boolean enableRequestPass(); + boolean enableRequestPass(); - public void maintainFlowControl(); + void maintainFlowControl(); } diff --git a/src/main/java/org/bdware/doip/cluster/util/AuditDoipClientCacheUtil.java b/src/main/java/org/bdware/doip/cluster/util/AuditDoipClientCacheUtil.java index 1edb9d7..5fe33c5 100644 --- a/src/main/java/org/bdware/doip/cluster/util/AuditDoipClientCacheUtil.java +++ b/src/main/java/org/bdware/doip/cluster/util/AuditDoipClientCacheUtil.java @@ -41,7 +41,7 @@ public class AuditDoipClientCacheUtil { public static void getClientByUrl(String address, String version, ClientReadyCallback callback) { ReconnectReqPack pack = new ReconnectReqPack(address, version, callback); - DoaClusterClient.sendMessagePool.execute(pack); + DoaClusterClient.taskPool.execute(pack); } public static AuditDoipClient createWithoutConnect(String address) { -- Gitee