From d0042aa8a6e5fd5bdc700491cfaa751bf06b677f Mon Sep 17 00:00:00 2001 From: 18823700357 <329386014@qq.com> Date: Wed, 22 Feb 2023 16:18:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=87=AA=E5=AE=9A=E4=B9=89en?= =?UTF-8?q?dpoint=EF=BC=8C=E9=98=B2=E6=AD=A2=E6=96=B0=E5=A2=9E=E5=8F=AF?= =?UTF-8?q?=E7=94=A8=E5=8C=BA=E6=9C=AA=E5=8F=8A=E6=97=B6=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=AF=BC=E8=87=B4=E6=97=A0=E6=B3=95=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AbstractOBSProcessor.java | 9 ------ .../huawei/common/PropertyDescriptors.java | 8 ++++- .../huawei/obs/DeleteOBSObject.java | 7 +++-- .../processors/huawei/obs/FetchOBSObject.java | 5 +++- .../processors/huawei/obs/ListOBSObject.java | 3 ++ .../processors/huawei/obs/OBSRegions.java | 1 + .../nifi/processors/huawei/obs/OBSUtils.java | 17 ++++++++--- .../processors/huawei/obs/PutOBSObject.java | 1 + .../huawei/obs/model/OBSRecord.java | 8 ++++- .../nifi/processors/obs/AbstractOBSIT.java | 14 +++++++++ .../nifi/processors/obs/ITListOBSObject.java | 29 +++++++++++++++++++ 11 files changed, 83 insertions(+), 19 deletions(-) diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java index 7913f00..ab10ba0 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java @@ -183,16 +183,7 @@ public abstract class AbstractOBSProcessor extends AbstractHuaweiProcessor imple return null; } - protected String getUrl(String region, String bucket, String objectKey) { - if (StringUtils.isBlank(region) || StringUtils.isBlank(bucket) || StringUtils.isBlank(objectKey)) { - return null; - } - if (objectKey.startsWith("/")) { - objectKey = objectKey.substring(1); - } - return String.format("http://%s.%s/%s", bucket, OBSRegions.fromName(region).getEndpoint(), objectKey); - } public ObsClient getClient() { diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java index 85a55be..fbe7a43 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java @@ -80,10 +80,17 @@ public interface PropertyDescriptors { PropertyDescriptor OBS_REGION = new PropertyDescriptor.Builder() .name("OBS Region") + .description("If the destination Region is Not found, select Not Found and enter the Endpoint address of the destination Region in the Endpoint Override URL box") .required(true) .allowableValues(OBSRegions.getAvailableOBSRegions()) .defaultValue(OBSRegions.DEFAULT_REGION.getName()) .build(); + PropertyDescriptor ENDPOINT_OVERRIDE_URL = new PropertyDescriptor.Builder() + .name("Endpoint Override URL") + .description("The endpoint specified in this option box takes effect only when the OBS Region option is Not found") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() .name("Communications Timeout") .required(true) @@ -99,7 +106,6 @@ public interface PropertyDescriptors { .build(); /** * Huawei credentials provider service - * */ PropertyDescriptor HUAWEI_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder() .name("Huawei Credentials Provider service") diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java index 4f48a6d..3c30506 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java @@ -52,7 +52,7 @@ import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*; "If attempting to delete a file that does not exist, FlowFile is routed to success.") public class DeleteOBSObject extends AbstractOBSProcessor { public static final List properties = Collections.unmodifiableList( - Arrays.asList(OBS_REGION, BUCKET, ACCESS_KEY, SECRET_KEY, HUAWEI_CREDENTIALS_PROVIDER_SERVICE, KEY, TIMEOUT, + Arrays.asList(OBS_REGION, ENDPOINT_OVERRIDE_URL, BUCKET, ACCESS_KEY, SECRET_KEY, HUAWEI_CREDENTIALS_PROVIDER_SERVICE, KEY, TIMEOUT, // SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE)); @@ -71,8 +71,9 @@ public class DeleteOBSObject extends AbstractOBSProcessor { final long startNanos = System.nanoTime(); final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); - final String region = context.getProperty(OBS_REGION).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String region = context.getProperty(OBS_REGION).getValue(); + final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue(); final ObsClient obsClient = getClient(); // Deletes a key @@ -89,6 +90,6 @@ public class DeleteOBSObject extends AbstractOBSProcessor { session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully delete OBS Object for {} in {} millis; routing to success", flowFile, transferMillis); - session.getProvenanceReporter().invokeRemoteProcess(flowFile, getUrl(region, bucket, key), "Object deleted"); + session.getProvenanceReporter().invokeRemoteProcess(flowFile, OBSUtils.getUrl(region,endpoint, bucket, key), "Object deleted"); } } diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java index 0d96e5e..d8a6fe5 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java @@ -97,6 +97,7 @@ public class FetchOBSObject extends AbstractOBSProcessor { public static final List properties = Collections.unmodifiableList( Arrays.asList( OBS_REGION, + ENDPOINT_OVERRIDE_URL, BUCKET, KEY, ACCESS_KEY, @@ -156,6 +157,8 @@ public class FetchOBSObject extends AbstractOBSProcessor { final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String region = context.getProperty(OBS_REGION).getValue(); + final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue(); final ObsClient client = getClient(); final GetObjectRequest request = createGetObjectRequest(context, flowFile.getAttributes(), attributes); @@ -210,7 +213,7 @@ public class FetchOBSObject extends AbstractOBSProcessor { session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully retrieved OBS Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); - session.getProvenanceReporter().fetch(flowFile, getUrl(context.getProperty(OBS_REGION).getValue(), bucket, key), transferMillis); + session.getProvenanceReporter().fetch(flowFile, OBSUtils.getUrl(region, endpoint, bucket, key), transferMillis); } private GetObjectMetadataRequest createGetObjectMetadataRequest(final ProcessContext context, final Map attributes) { diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java index 262087c..3bf50e8 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java @@ -75,6 +75,7 @@ import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*; public class ListOBSObject extends AbstractListHuaweiProcessor { public static final List properties = Collections.unmodifiableList(Arrays.asList( OBS_REGION, + ENDPOINT_OVERRIDE_URL, BUCKET, ACCESS_KEY, SECRET_KEY, @@ -143,6 +144,7 @@ public class ListOBSObject extends AbstractListHuaweiProcessor { protected List performListing(ProcessContext context, Long minTimestamp, ListingMode listingMode) throws IOException { final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp; final String region = context.getProperty(OBS_REGION).getValue(); + final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue(); List list = new ArrayList<>(); int batchCount = 0; ObsBucketLister bucketLister = new OBSObjectBucketLister(client, context); @@ -157,6 +159,7 @@ public class ListOBSObject extends AbstractListHuaweiProcessor { OBSRecord record = new OBSRecord(); record.setObjectMetadata(objectMetadata); record.setRegion(region); + record.setEndpoint(endpoint); record.setVersionSummary(versionSummary); list.add(record); batchCount++; diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java index 19cfb5e..211ad06 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; public enum OBSRegions { + NOT_FOUND("NotFound", "NotFound", "Not found"), OF_SOUTH_1("af-south-1", "obs.af-south-1.myhuaweicloud.com", "Africa - Johannesburg"), CN_NORTH_4("cn-north-4", "obs.cn-north-4.myhuaweicloud.com", "North China - Beijing IV"), CN_NORTH_1("cn-north-1", "obs.cn-north-1.myhuaweicloud.com", "North China - Beijing I"), diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java index c502694..e63a2bc 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java @@ -35,8 +35,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*; public class OBSUtils { - - public static String getUrl(String region, String bucket, String objectKey) { + public static String getUrl(String region,String endpoint, String bucket, String objectKey) { if (StringUtils.isBlank(region) || StringUtils.isBlank(bucket) || StringUtils.isBlank(objectKey)) { return null; } @@ -44,7 +43,11 @@ public class OBSUtils { if (objectKey.startsWith("/")) { objectKey = objectKey.substring(1); } - return String.format("http://%s.%s/%s", bucket, OBSRegions.fromName(region).getEndpoint(), objectKey); + + if (!OBSRegions.NOT_FOUND.equals(OBSRegions.fromName(region))) { + endpoint = OBSRegions.fromName(region).getEndpoint(); + } + return String.format("http://%s.%s/%s", bucket, endpoint, objectKey); } public static ObsClient createObsClientWithAkSk(ProcessContext context, ComponentLog logger) { @@ -105,8 +108,14 @@ public class OBSUtils { if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) { config.setHttpProxy(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort(), proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword()); } + String name = context.getProperty(OBS_REGION).getValue(); - config.setEndPoint(OBSRegions.fromName(name).getEndpoint()); + OBSRegions region = OBSRegions.fromName(name); + String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue(); + if (!OBSRegions.NOT_FOUND.equals(region)) { + endpoint = region.getEndpoint(); + } + config.setEndPoint(endpoint); return config; } } diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java index 6bac6cf..4752893 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java @@ -210,6 +210,7 @@ public class PutOBSObject extends AbstractOBSProcessor { public static final List properties = Collections.unmodifiableList( Arrays.asList( OBS_REGION, + ENDPOINT_OVERRIDE_URL, BUCKET, ACCESS_KEY, SECRET_KEY, diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/model/OBSRecord.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/model/OBSRecord.java index 48c0e01..8beff7f 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/model/OBSRecord.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/model/OBSRecord.java @@ -38,6 +38,8 @@ public class OBSRecord implements Comparable, Serializable, Listable private static final RecordSchema SCHEMA; private String region; + private String endpoint; + private VersionOrDeleteMarker versionSummary; private ObjectMetadata objectMetadata; @@ -98,7 +100,7 @@ public class OBSRecord implements Comparable, Serializable, Listable if (versionSummary == null) { return null; } - return OBSUtils.getUrl(region, versionSummary.getBucketName(), versionSummary.getObjectKey()); + return OBSUtils.getUrl(region, endpoint, versionSummary.getBucketName(), versionSummary.getObjectKey()); } @Override @@ -140,4 +142,8 @@ public class OBSRecord implements Comparable, Serializable, Listable public void setObjectMetadata(ObjectMetadata objectMetadata) { this.objectMetadata = objectMetadata; } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } } diff --git a/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/AbstractOBSIT.java b/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/AbstractOBSIT.java index 3adc08c..eed1a0b 100644 --- a/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/AbstractOBSIT.java +++ b/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/AbstractOBSIT.java @@ -55,6 +55,10 @@ public abstract class AbstractOBSIT { protected final static String sk = "your sk"; protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt"; protected final static String REGION_NAME = "cn-south-1"; + + // the endpoint of REGION_NAME + protected final static String ENDPOINT = "obs." + REGION_NAME +".myhuaweicloud.com"; + // Static so multiple Tests can use same client, you may need to clean up uploaded files at the end of the method protected static ObsClient client; @@ -207,6 +211,16 @@ public abstract class AbstractOBSIT { return runner; } + protected static TestRunner initRunner(Processor processor, String endpoint) { + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(OBS_REGION, "NotFound"); + runner.setProperty(ENDPOINT_OVERRIDE_URL, endpoint); + runner.setProperty(BUCKET, bucketName); + runner.setProperty(ACCESS_KEY, ak); + runner.setProperty(SECRET_KEY, sk); + return runner; + } + protected static TestRunner initCredentialsRunner(Processor processor) throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(OBS_REGION, REGION_NAME); diff --git a/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/ITListOBSObject.java b/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/ITListOBSObject.java index 3032ccc..8098aeb 100644 --- a/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/ITListOBSObject.java +++ b/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/ITListOBSObject.java @@ -68,6 +68,35 @@ public class ITListOBSObject extends AbstractOBSIT{ clearObjects(putFiles); } + @Test + public void testRegionNotFound() throws IOException, InterruptedException { + String[] putFiles = new String[]{"a","b/c","c/e","c/f","d/e"}; + + putTestFile(putFiles); + final TestRunner runner = initRunner(new ListOBSObject(), ENDPOINT); + runner.setValidateExpressionUsage(false); + sleep(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListOBSObject.REL_SUCCESS, 5); + sleep(); + // update + putTestFile(putFiles); + runner.run(); + + // repeat the cycle to ensure that all new objects are listed + // AbstractListProcessor.listByTrackingTimestamps + sleep(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListOBSObject.REL_SUCCESS, 10); + List flowFiles = runner.getFlowFilesForRelationship(ListOBSObject.REL_SUCCESS); + flowFiles.get(0).assertAttributeEquals("filename", "a"); + flowFiles.get(1).assertAttributeEquals("filename", "b/c"); + flowFiles.get(2).assertAttributeEquals("filename", "c/e"); + flowFiles.get(3).assertAttributeEquals("filename", "c/f"); + flowFiles.get(4).assertAttributeEquals("filename", "d/e"); + clearObjects(putFiles); + } + @Test public void testSimpleListUsingCredentialsProviderService() throws Throwable { String[] putFiles = new String[]{"a","b/c","d/e"}; -- Gitee