From 8df3d4c805c5a1bb5458d6069201a49aa191d9cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=AE=87=E5=A4=A9?= <542675903@qq.com> Date: Tue, 26 Sep 2023 17:32:27 +0800 Subject: [PATCH] src/main/java/org/bdware/datanet/registry/HandleEvent/NaiveEventHandler.java --- .../HandleEvent/NaiveEventHandler.java | 35 +++ .../HandleEvent/verified/Commiter.java | 7 + .../verified/PublisherToTable.java | 31 +++ .../verified/StorageMetaMessage.java | 2 + .../verified/VerifiedEventHandler.java | 2 + .../verified/VerifiedPublisher.java | 198 ++++++++++++++ .../verified/VerifiedSubscriber.java | 104 +++++++ .../verified/merkle/MerkelMatcher.java | 15 ++ .../verified/merkle/NaiveMerkelMatcher.java | 255 ++++++++++++++++++ .../verified/merkle/TreeCounter.java | 80 ++++++ .../registry/merkle/NaiveMerkelMatcher.java | 2 +- src/test/java/test.java | 2 + 12 files changed, 732 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/NaiveEventHandler.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/Commiter.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/PublisherToTable.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/StorageMetaMessage.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedEventHandler.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedPublisher.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedSubscriber.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/MerkelMatcher.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/NaiveMerkelMatcher.java create mode 100644 src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/TreeCounter.java create mode 100644 src/test/java/test.java diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/NaiveEventHandler.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/NaiveEventHandler.java new file mode 100644 index 0000000..089d21e --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/NaiveEventHandler.java @@ -0,0 +1,35 @@ +package org.bdware.doip.event; + +import org.bdware.doip.RocksDBUtil; +import org.bdware.doip.audit.AuditDoaClient; +import org.bdware.doip.codec.doipMessage.DoipMessage; +import org.bdware.doip.endpoint.event.EventMessageParser; +import org.bdware.doip.endpoint.event.TopicHandler; + +public class NaiveEventHandler implements TopicHandler { + EventMessageParser messageParser; + + + public NaiveEventHandler(String path, AuditDoaClient client,String myTopic) { + RocksDBUtil storageManager = RocksDBUtil.loadDB(path, false); + NaivePublisher publisher = new NaivePublisher(storageManager, client); + NaiveSubscriber subscriber = new NaiveSubscriber(storageManager, client) { + @Override + public void onReceiveData(String topicId, String publisherId, DoipMessage request) { + //replace Topic + publisher.publish(myTopic, request); + } + }; + messageParser = new EventMessageParser(publisher, subscriber); + } + + @Override + public DoipMessage handlePublish(DoipMessage request) { + return messageParser.handlePublish(request); + } + + @Override + public DoipMessage handleSubscribe(DoipMessage request) { + return messageParser.handleSubscribe(request); + } +} diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/Commiter.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/Commiter.java new file mode 100644 index 0000000..0761be4 --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/Commiter.java @@ -0,0 +1,7 @@ +package org.bdware.doip.event.verified; + +import org.bdware.doip.codec.doipMessage.DoipMessage; + +public interface Commiter { + void commit(String topicId, String publisherId, DoipMessage doipMessage); +} diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/PublisherToTable.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/PublisherToTable.java new file mode 100644 index 0000000..3b5cae0 --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/PublisherToTable.java @@ -0,0 +1,31 @@ +package org.bdware.datanet.registry.HandleEvent.verified; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.bdware.datanet.registry.HandleEvent.verified.merkle.NaiveMerkelMatcher; +import org.bdware.sc.util.JsonUtil; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class StorageTable { + public Map> totalTable; + public String toJson() { + return JsonUtil.toJson(this); + } + public static StorageTable fromJson(JsonElement content) { + if (content == null) return null; + return JsonUtil.fromJson(content, StorageTable.class); + } + public Map get(String publisherId) { + if (!totalTable.containsKey(publisherId)) { + totalTable.put(publisherId, new ConcurrentHashMap<>()); + } + Map fromIdTable = totalTable.get(publisherId); + return fromIdTable; + } + + public void put(String publisherId, Map fromIdTable) { + totalTable.put(publisherId, fromIdTable); + } +} diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/StorageMetaMessage.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/StorageMetaMessage.java new file mode 100644 index 0000000..b3261b7 --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/StorageMetaMessage.java @@ -0,0 +1,2 @@ +package org.bdware.datanet.registry.HandleEvent.verified;public class storageMetaMessage { +} diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedEventHandler.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedEventHandler.java new file mode 100644 index 0000000..7f39839 --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedEventHandler.java @@ -0,0 +1,2 @@ +package org.bdware.datanet.registry.HandleEvent.verified;public class VerifiedEventHandler { +} diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedPublisher.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedPublisher.java new file mode 100644 index 0000000..95f0048 --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedPublisher.java @@ -0,0 +1,198 @@ +package org.bdware.doip.event.verified; + +import com.google.gson.*; +import org.bdware.doip.RocksDBUtil; +import org.bdware.doip.audit.AuditDoaClient; +import org.bdware.doip.audit.client.AuditDoipClient; +import org.bdware.doip.codec.doipMessage.DoipMessage; +import org.bdware.doip.codec.doipMessage.DoipMessageFactory; +import org.bdware.doip.codec.doipMessage.DoipResponseCode; +import org.bdware.doip.codec.operations.BasicOperations; +import org.bdware.doip.endpoint.event.PublishMessageType; +import org.bdware.doip.endpoint.event.Publisher; + +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; + +public class VerifiedPublisher implements Publisher { + private static final Gson GSON = new Gson(); + // key 是topicId,仅支持1种topicId, value是[subscriber1,subscriber2,...] + // 待发布的内容存储的格式为 + // topicId:pub 保存一个数组,数组内容为[hash1,hash2,...] + // hash1-->待发布的EventDoipMessage的body + // EventDoipMessage的格式为 {header: order:xx,hash:xxx,topic:xx,publisher:xx, body:Json字符串} + RocksDBUtil storageManager; + AuditDoaClient client; + String publisherId; + + + //要有默克尔树? + public VerifiedPublisher(RocksDBUtil storageManager, AuditDoaClient client, String publisherId) { + this.storageManager = storageManager; + this.client = client; + this.publisherId = publisherId; + } + + synchronized private void addSubscribers(String topicId, String subscriberId) { + JsonArray subscribers = getAsJson(topicId, new JsonArray()).getAsJsonArray(); + JsonPrimitive sid = new JsonPrimitive(subscriberId); + if (!subscribers.contains(sid)) { + subscribers.add(sid); + storageManager.put(topicId, subscribers.toString()); + } + } + + synchronized private void removeSubscribers(String topicId, String subscriberId) { + JsonArray subscribers = getAsJson(topicId, new JsonArray()).getAsJsonArray(); + JsonPrimitive sid = new JsonPrimitive(subscriberId); + if (subscribers.contains(sid)) { + subscribers.remove(sid); + storageManager.put(topicId, subscribers.toString()); + } + } + + private JsonElement getAsJson(String key, JsonElement defaultValue) { + String content = storageManager.get(key); + try { + if (null != content && !content.isEmpty()) { + return JsonParser.parseString(content); + } + } catch (Exception e) { + e.printStackTrace(); + } + return defaultValue; + } + + private String getEventListKey(String topicId) { + return topicId + ":pub"; + } + + private AuditDoipClient getOrCreateConnection(String subscriberId) { + return client.convertDoidToRepo(subscriberId); + } + + @Override + public DoipMessage subscribe(String topicId, String subscriberId, boolean needReplay, DoipMessage request) { + addSubscribers(topicId, subscriberId); + if (needReplay) { + new Thread(() -> { + JsonArray publishedEvents = getAsJson(getEventListKey(topicId), new JsonArray()).getAsJsonArray(); + try { + for (int i = 0; i < publishedEvents.size(); i++) { + String hash = publishedEvents.get(i).getAsString(); + String content = storageManager.get(hash); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createRequest(subscriberId, BasicOperations.Publish.getName()); + builder.addAttributes("order", i); + builder.addAttributes("hash", hash); + builder.addAttributes("publisherId", publisherId); + builder.addAttributes("topicId", topicId); + builder.addAttributes("publishType", PublishMessageType.DataAndHash.name()); + builder.setBody(content.getBytes()); + client.sendRawMessageSync(builder.create(), 5000); + } + } catch (Exception ignored) { + } + }).start(); + } + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createResponse(DoipResponseCode.Success, request).setBody("success".getBytes()); + return builder.create(); + } + + @Override + public DoipMessage unSubscribe(String topicId, String subscriberId, DoipMessage request) { + removeSubscribers(topicId, subscriberId); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createResponse(DoipResponseCode.Success, request).setBody("success".getBytes(StandardCharsets.UTF_8)); + return builder.create(); + } + + @Override + public DoipMessage sendDataInRange(String topicId, String subscriberId, long offset, long count, DoipMessage request) { + new Thread(() -> { + JsonArray array = getAsJson(topicId + ":pub", new JsonArray()).getAsJsonArray(); + for (int i = (int) offset; i < (int) (offset + count); i++) { + String hash = array.get(i).getAsString(); + int order = i; + String content = storageManager.get(hash); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createRequest(subscriberId, BasicOperations.Publish.getName()); + builder.addAttributes("order", order); + builder.addAttributes("hash", hash); + builder.addAttributes("publisherId", publisherId); + builder.addAttributes("topicId", topicId); + builder.addAttributes("publishType", PublishMessageType.DataAndHash.name()); + builder.setBody(content.getBytes()); + client.sendRawMessageSync(builder.create(), 5000); + } + }).start(); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createResponse(DoipResponseCode.Success, null); + return builder.create(); + } + + @Override + public DoipMessage sendDataInList(String topicId, String subscriberId, JsonArray indexList, DoipMessage request) { + new Thread(() -> { + JsonArray array = getAsJson(topicId + ":pub", new JsonArray()).getAsJsonArray(); + for (JsonElement index : indexList) { + int i = index.getAsInt(); + String hash = array.get(i).getAsString(); + int order = i; + String content = storageManager.get(hash); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createRequest(subscriberId, BasicOperations.Publish.getName()); + builder.addAttributes("order", order); + builder.addAttributes("hash", hash); + builder.addAttributes("publisherId", publisherId); + builder.addAttributes("topicId", topicId); + builder.addAttributes("publishType", PublishMessageType.DataAndHash.name()); + builder.setBody(content.getBytes()); + client.sendRawMessageSync(builder.create(), 5000); + } + }).start(); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createResponse(DoipResponseCode.Success, null); + return builder.create(); + + } + + @Override + public DoipMessage verifyMerkelInRange(String topicId, String subscriberId, DoipMessage request) { + throw new IllegalStateException("unsupported message type"); + } + + @Override + public DoipMessage sendMerkelInRange(String topicId, String subscriberId, DoipMessage request) { + throw new IllegalStateException("unsupported message type"); + } + + @Override + public synchronized void publish(String topicId, DoipMessage request) { + String content = request.body.getDataAsJsonString(); + String hashValue = HashUtil.hash(request.body.getDataAsJsonString()); + JsonArray array = getAsJson(topicId + ":pub", new JsonArray()).getAsJsonArray(); + array.add(hashValue); + storageManager.put(topicId + ":pub", array.toString()); + storageManager.put(hashValue, content); + JsonArray subscribers = getAsJson(topicId, new JsonArray()).getAsJsonArray(); + request.header.parameters.attributes.addProperty("topicId", topicId); + request.header.parameters.attributes.addProperty("order", array.size()); + request.header.parameters.attributes.addProperty("hash", hashValue); + request.header.parameters.attributes.addProperty("publisherId", publisherId); + request.header.parameters.attributes.addProperty("publishType", PublishMessageType.DataAndHash.name()); + new Thread(() -> { + Set sent = new HashSet<>(); + subscribers.forEach(je -> { + String id = je.getAsString(); + AuditDoipClient client = getOrCreateConnection(id); + if (!sent.contains(id)) { + sent.add(id); + client.sendMessage(request, null); + } + }); + }).start(); + } +} diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedSubscriber.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedSubscriber.java new file mode 100644 index 0000000..5b4bc09 --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/VerifiedSubscriber.java @@ -0,0 +1,104 @@ +package org.bdware.doip.event.verified; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.bdware.doip.RocksDBUtil; +import org.bdware.doip.audit.AuditDoaClient; +import org.bdware.doip.codec.doipMessage.DoipMessage; +import org.bdware.doip.codec.doipMessage.DoipMessageFactory; +import org.bdware.doip.codec.doipMessage.DoipResponseCode; +import org.bdware.doip.endpoint.event.Subscriber; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Map; + +public abstract class VerifiedSubscriber implements Subscriber, Commiter { + private static final Gson GSON = new Gson(); + + private Map topicIdToQueue; + // key 是topicId:pub, value是[hash1,hash2,...] + // key 是hash, value是DoipMessageContent + private final RocksDBUtil storageManager; + ; + AuditDoaClient client; + private String subscriberId; + + public VerifiedSubscriber(RocksDBUtil storageManager, AuditDoaClient client, String subscriberId) { + this.storageManager = storageManager; + this.client = client; + this.subscriberId = subscriberId; + } + + synchronized private void savePublishedEvents(String topicId, int order, String hash, String content) { + VerifiedEventQueue queue = getOrCreateQueue(topicId); + queue.appendEvent(order, hash, content); + } + + private VerifiedEventQueue getOrCreateQueue(String topicId) { + if (!topicIdToQueue.containsKey(topicId)) { + topicIdToQueue.put(topicId, new VerifiedEventQueue()); + } + return topicIdToQueue.get(topicId); + } + + private JsonElement getAsJson(String key, JsonElement defaultValue) { + String content = storageManager.get(key); + try { + if (null != content && !content.isEmpty()) { + return JsonParser.parseString(content); + } + } catch (Exception e) { + e.printStackTrace(); + } + return defaultValue; + } + + @Override + public DoipMessage receiveData(String topicId, String publishId, DoipMessage request) { + throw new IllegalStateException("unsupported data message, missing hash value"); + } + + + @Override + public DoipMessage receiveHash(String topicId, String subscriberId, DoipMessage data) { + throw new IllegalStateException("unsupported message type"); + } + + @Override + public DoipMessage receiveDataAndHash(String topicId, String publisherId, DoipMessage request) { + try { + int order = request.header.parameters.attributes.get("order").getAsInt(); + String hash = request.header.parameters.attributes.get("hash").getAsString(); + String content = request.body.getDataAsJsonString(); + savePublishedEvents(topicId, order, hash, content); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createResponse(DoipResponseCode.Success, request).setBody(request.body.getEncodedData()); + return builder.create(); + } catch (Exception e) { + ByteArrayOutputStream bo = new ByteArrayOutputStream(); + e.printStackTrace(new PrintStream(bo)); + DoipMessageFactory.DoipMessageBuilder builder = new DoipMessageFactory.DoipMessageBuilder(); + builder.createResponse(DoipResponseCode.UnKnownError, request); + builder.setBody(bo.toByteArray()); + return builder.create(); + } + } + + @Override + public void receiveMerkelConfiguration(String topicId, JsonObject configuration, DoipMessage request) { + //unsupported message type + return; + } + + @Override + public synchronized void commit(String topicId, String publishId, DoipMessage commitedMessage) { + onReceiveData(topicId, publishId, commitedMessage); + } + + public String getId() { + return subscriberId; + } +} diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/MerkelMatcher.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/MerkelMatcher.java new file mode 100644 index 0000000..ba121ad --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/MerkelMatcher.java @@ -0,0 +1,15 @@ +package org.bdware.datanet.registry.merkle; + +import java.util.List; + +public interface MerkelMatcher { + List isMatch(long offset, long count, List hash); + + List getHashList(long offset, long count); + + boolean pushContent(String content); + + String generateHash(String content); + + void updateMerkelTree(long index, String content); +} diff --git a/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/NaiveMerkelMatcher.java b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/NaiveMerkelMatcher.java new file mode 100644 index 0000000..d5ae5d8 --- /dev/null +++ b/src/main/java/org/bdware/datanet/registry/HandleEvent/verified/merkle/NaiveMerkelMatcher.java @@ -0,0 +1,255 @@ +package org.bdware.datanet.registry.merkle; + +import com.google.gson.Gson; +import io.netty.util.internal.StringUtil; +import org.bdware.datanet.registry.merkle.type.VerifyType; +import org.bouncycastle.pqc.math.linearalgebra.ByteUtils; +import org.zz.gmhelper.SM3Util; + +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class NaiveMerkelMatcher implements MerkelMatcher { + + Map merkelNodes = new ConcurrentHashMap<>(); + TreeCounter treeCounter; + Map topicNodes = new ConcurrentHashMap<>(); + //期待接收到的下一个数据的编号 + AtomicLong order = new AtomicLong(1); + + public NaiveMerkelMatcher() { + treeCounter = new TreeCounter(2, 10); + } + public NaiveMerkelMatcher(VerifyType verifyType){ + switch (verifyType) { + case One: + case Two: + case Three: + treeCounter = new TreeCounter(100, 2); + case Four: + treeCounter = new TreeCounter(3, 10); + default: + } + } + public void updateTopicNodes(Map topicNodes) { + this.topicNodes.putAll(topicNodes); + } + + public String peekDB() { + return new Gson().toJson(merkelNodes); + } + + @Override + public List isMatch(long offset, long count, List hash) { + List ret = treeCounter.getHashList(offset, count); + List result = new ArrayList<>(); + for (String merkelKey : ret) { + if (!hash.contains(merkelNodes.get(merkelKey))) { + result.addAll(findReSend(merkelKey, hash)); + } + } + Collections.sort(result); + return result; + } + +// public List findReData(int offset, int count, List hash) { +// List ret = treeCounter.getHashList(offset, count); +// List result = new ArrayList<>(); +// for(String merkelKey: ret){ +// if(!hash.contains(merkelNodes.get(merkelKey))) { +// result.addAll(findReSend(merkelKey, hash)); +// } +// } +// Collections.sort(result); +// return result; +// } + + public List findReSend(String key, List hash) { + List result = new ArrayList<>(); + long layer = findIndex(key).get(0); + long index = findIndex(key).get(1); + if (layer == 0) { + result.add(index); + } else { + layer--; + for (long i = 1; i <= 3; i++) { + long j = (index - 1) * 3 + i; + if (!hash.contains(merkelNodes.get(getMerkelKey(layer, j)))) { + result.addAll(findReSend(getMerkelKey(layer, j), hash)); + } + } + } + return result; + } + + public List findIndex(String key) { + List res = new ArrayList<>(); + int index = 0; + for (int i = 2; i < key.length(); i++) { + if (key.charAt(i) == '_') { + index = i; + } + } + res.add(Integer.valueOf(key.substring(2, index))); + res.add(Integer.valueOf(key.substring(index + 1, key.length()))); + return res; + } + + String getMerkelKey(long layer, long count) { + return "l_" + layer + "_" + count; + } + + public List getHashList(long offset, long count) { + List ret = treeCounter.getHashList(offset, count); + List result = new ArrayList<>(); + for (String merkelKey : ret) { + result.add(merkelNodes.get(merkelKey)); + } + return ret; + } + + @Override + public boolean pushContent(String content) { + order.incrementAndGet(); + String hashValue = generateHash(content); + List treeOrder = treeCounter.increment(); + pushHash(hashValue, treeOrder); + return false; + } + + private void pushHash(String hashValue, List treeOrder) { + String currentHash = hashValue; + for (int i = 0; i < treeOrder.size(); i++) { + int orderInCurrentLayer = treeOrder.get(i); + if (i == 0) { + merkelNodes.put(getMerkelKey(i, orderInCurrentLayer), currentHash); + } else { + StringBuilder sb = new StringBuilder(); + int subStart = treeCounter.startChild(orderInCurrentLayer); + int subEnd = treeCounter.endChild(orderInCurrentLayer); + for (int j = subStart; j <= subEnd; j++) { + sb.append(merkelNodes.get(getMerkelKey(i - 1, j))).append("|"); + } + currentHash = generateHash(sb.toString()); + merkelNodes.put(getMerkelKey(i, orderInCurrentLayer), currentHash); + } + } + } + + @Override + public String generateHash(String content) { + if (StringUtil.isNullOrEmpty(content)) return null; + String retValue = ByteUtils.toHexString(SM3Util.hash(content.getBytes(StandardCharsets.UTF_8))); + return retValue; + } + + public Map generateRecord(long index) { + Map record = new HashMap<>(); + long layer = 0; + record.put(layer, index); + while (index >= 3) { + layer++; + index = (index - 1) / 3 + 1; + record.put(layer, index); + } + return record; + } + + @Override + public void updateMerkelTree(long index, String content) { + Map record = generateRecord(index); + String hashValue = generateHash(content); + int j = 0; + long k = index; + merkelNodes.put(getMerkelKey(j, index), hashValue); + boolean isContinue = true; + while (k % 3 == 0 || record.get(j) == k) { + StringBuilder sb = new StringBuilder(); + if (record.get(j) == k) { + j++; + k = (k - 1) / 3 * 3 + 1; + for (long x = k; x < k + 3; x++) { + if (merkelNodes.containsKey(getMerkelKey(j - 1, x))) { + sb.append(merkelNodes.get(getMerkelKey(j - 1, x))).append("|"); + } else { + isContinue = false; + break; + } + } + if (!isContinue) { + break; + } + String currentHash = generateHash(sb.toString()); + k = k / 3 + 1; + merkelNodes.put(getMerkelKey(j, k), currentHash); + } else { + for (long y = k - 2; y <= k; y++) { + if (merkelNodes.containsKey(getMerkelKey(j, y))) { + sb.append(merkelNodes.get(getMerkelKey(j, y))).append("|"); + } else { + isContinue = false; + break; + } + } + if (!isContinue) { + break; + } + String currentHash = generateHash(sb.toString()); + merkelNodes.put(getMerkelKey(j + 1, k / 3), currentHash); + j++; + k = k / 3; + } + } + } + +// public void updateMerkelTree(int[] index, String[] content, Map record){ +// System.out.println(index.length); +// for(int i=0; i orders = new ArrayList<>(); + + public TreeCounter(int degree, int height) { + this.degree = degree; + this.height = height; + } + + public List getHashList(long offset, long count) { + long end = offset + count - 1; + List ret = new ArrayList<>(); + int layer = 0; + while (end - offset >= 2 && layer <= height) { + while(offset % degree != 1){ + ret.add("l_" + layer + "_" + offset); + offset++; + } + if (end % degree == 0) { + offset = offset / degree + 1; + end = end / degree; + } else { + offset = offset / degree + 1; + while(end % degree != 0){ + ret.add("l_" + layer + "_" + end); + end--; + } + end = end / degree; + } + layer++; + } + for(long i = offset; i <= end; i++){ + ret.add("l_" + layer + "_" + i); + } + return ret; + } + + public synchronized List increment() { + List ret = new ArrayList<>(); + boolean needNext = true; + int currentLayer = 0; + for (int i = 0; needNext; i++) { + AtomicInteger current; + if (orders.size() > i) { + current = orders.get(i); + } else { + current = new AtomicInteger(0); + orders.add(current); + } +// System.out.println(orders.size()); +// System.out.println(current); + int currentValue = current.incrementAndGet(); + ret.add(currentValue); +// System.out.println(currentValue); + needNext = currentValue % degree == 0 && ++currentLayer < height; +// System.out.println("-------------"); +// for (Integer value : ret) { +// System.out.println(value); +// } +// System.out.println("-------------"); + } + return ret; + } + + public int startChild(int orderInCurrentLayer) { + return 1 + (orderInCurrentLayer - 1) * degree; + } + + public int endChild(int orderInCurrentLayer) { + return orderInCurrentLayer * degree; + } +} diff --git a/src/main/java/org/bdware/datanet/registry/merkle/NaiveMerkelMatcher.java b/src/main/java/org/bdware/datanet/registry/merkle/NaiveMerkelMatcher.java index d5ae5d8..fdfe137 100644 --- a/src/main/java/org/bdware/datanet/registry/merkle/NaiveMerkelMatcher.java +++ b/src/main/java/org/bdware/datanet/registry/merkle/NaiveMerkelMatcher.java @@ -15,7 +15,7 @@ public class NaiveMerkelMatcher implements MerkelMatcher { Map merkelNodes = new ConcurrentHashMap<>(); TreeCounter treeCounter; - Map topicNodes = new ConcurrentHashMap<>(); + public Map topicNodes = new ConcurrentHashMap<>(); //期待接收到的下一个数据的编号 AtomicLong order = new AtomicLong(1); diff --git a/src/test/java/test.java b/src/test/java/test.java new file mode 100644 index 0000000..5dfe403 --- /dev/null +++ b/src/test/java/test.java @@ -0,0 +1,2 @@ +package PACKAGE_NAME;public class test { +} -- Gitee