diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java index 7913f004a2fbd521515858e703c47703ca912e06..ab10ba0a163f98b3f41299f9772974ad3ef7e06a 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java @@ -183,16 +183,7 @@ public abstract class AbstractOBSProcessor extends AbstractHuaweiProcessor imple return null; } - protected String getUrl(String region, String bucket, String objectKey) { - if (StringUtils.isBlank(region) || StringUtils.isBlank(bucket) || StringUtils.isBlank(objectKey)) { - return null; - } - if (objectKey.startsWith("/")) { - objectKey = objectKey.substring(1); - } - return String.format("http://%s.%s/%s", bucket, OBSRegions.fromName(region).getEndpoint(), objectKey); - } public ObsClient getClient() { diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java index 85a55be9b4388ec4288318e3f1382962720bd3cd..fbe7a4326135a0324f5679ec0996ce53c2ed2ebe 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java @@ -80,10 +80,17 @@ public interface PropertyDescriptors { PropertyDescriptor OBS_REGION = new PropertyDescriptor.Builder() .name("OBS Region") + .description("If the destination Region is Not found, select Not Found and enter the Endpoint address of the destination Region in the Endpoint Override URL box") .required(true) .allowableValues(OBSRegions.getAvailableOBSRegions()) .defaultValue(OBSRegions.DEFAULT_REGION.getName()) .build(); + PropertyDescriptor ENDPOINT_OVERRIDE_URL = new PropertyDescriptor.Builder() + .name("Endpoint Override URL") + .description("The endpoint specified in this option box takes effect only when the OBS Region option is Not found") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() .name("Communications Timeout") .required(true) @@ -99,7 +106,6 @@ public interface PropertyDescriptors { .build(); /** * Huawei credentials provider service - * */ PropertyDescriptor HUAWEI_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder() .name("Huawei Credentials Provider service") diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java index 4f48a6d489776f7b5fe423a306fdc9d7c4d70c29..3c305061fb43ada1b49d2cac7f624da90bf51038 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java @@ -52,7 +52,7 @@ import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*; "If attempting to delete a file that does not exist, FlowFile is routed to success.") public class DeleteOBSObject extends AbstractOBSProcessor { public static final List properties = Collections.unmodifiableList( - Arrays.asList(OBS_REGION, BUCKET, ACCESS_KEY, SECRET_KEY, HUAWEI_CREDENTIALS_PROVIDER_SERVICE, KEY, TIMEOUT, + Arrays.asList(OBS_REGION, ENDPOINT_OVERRIDE_URL, BUCKET, ACCESS_KEY, SECRET_KEY, HUAWEI_CREDENTIALS_PROVIDER_SERVICE, KEY, TIMEOUT, // SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE)); @@ -71,8 +71,9 @@ public class DeleteOBSObject extends AbstractOBSProcessor { final long startNanos = System.nanoTime(); final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); - final String region = context.getProperty(OBS_REGION).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String region = context.getProperty(OBS_REGION).getValue(); + final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue(); final ObsClient obsClient = getClient(); // Deletes a key @@ -89,6 +90,6 @@ public class DeleteOBSObject extends AbstractOBSProcessor { session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully delete OBS Object for {} in {} millis; routing to success", flowFile, transferMillis); - session.getProvenanceReporter().invokeRemoteProcess(flowFile, getUrl(region, bucket, key), "Object deleted"); + session.getProvenanceReporter().invokeRemoteProcess(flowFile, OBSUtils.getUrl(region,endpoint, bucket, key), "Object deleted"); } } diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java index 0d96e5e3731511f535b0b210a8f189d419db290b..d8a6fe5cef8eed6d4a79cb38e34a2352308fa39a 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java @@ -97,6 +97,7 @@ public class FetchOBSObject extends AbstractOBSProcessor { public static final List properties = Collections.unmodifiableList( Arrays.asList( OBS_REGION, + ENDPOINT_OVERRIDE_URL, BUCKET, KEY, ACCESS_KEY, @@ -156,6 +157,8 @@ public class FetchOBSObject extends AbstractOBSProcessor { final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String region = context.getProperty(OBS_REGION).getValue(); + final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue(); final ObsClient client = getClient(); final GetObjectRequest request = createGetObjectRequest(context, flowFile.getAttributes(), attributes); @@ -210,7 +213,7 @@ public class FetchOBSObject extends AbstractOBSProcessor { session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully retrieved OBS Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); - session.getProvenanceReporter().fetch(flowFile, getUrl(context.getProperty(OBS_REGION).getValue(), bucket, key), transferMillis); + session.getProvenanceReporter().fetch(flowFile, OBSUtils.getUrl(region, endpoint, bucket, key), transferMillis); } private GetObjectMetadataRequest createGetObjectMetadataRequest(final ProcessContext context, final Map attributes) { diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java index 262087ce6af39b6e2778fd6dccf57b2dea09a919..3bf50e8e25aa3561c02154ca504e092c3a94eddf 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java @@ -75,6 +75,7 @@ import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*; public class ListOBSObject extends AbstractListHuaweiProcessor { public static final List properties = Collections.unmodifiableList(Arrays.asList( OBS_REGION, + ENDPOINT_OVERRIDE_URL, BUCKET, ACCESS_KEY, SECRET_KEY, @@ -143,6 +144,7 @@ public class ListOBSObject extends AbstractListHuaweiProcessor { protected List performListing(ProcessContext context, Long minTimestamp, ListingMode listingMode) throws IOException { final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp; final String region = context.getProperty(OBS_REGION).getValue(); + final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue(); List list = new ArrayList<>(); int batchCount = 0; ObsBucketLister bucketLister = new OBSObjectBucketLister(client, context); @@ -157,6 +159,7 @@ public class ListOBSObject extends AbstractListHuaweiProcessor { OBSRecord record = new OBSRecord(); record.setObjectMetadata(objectMetadata); record.setRegion(region); + record.setEndpoint(endpoint); record.setVersionSummary(versionSummary); list.add(record); batchCount++; diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java index 19cfb5e1ca109140f6fa1cadc4e006dab30e7d22..211ad06b81da2a700180b25be2a60d841949546d 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; public enum OBSRegions { + NOT_FOUND("NotFound", "NotFound", "Not found"), OF_SOUTH_1("af-south-1", "obs.af-south-1.myhuaweicloud.com", "Africa - Johannesburg"), CN_NORTH_4("cn-north-4", "obs.cn-north-4.myhuaweicloud.com", "North China - Beijing IV"), CN_NORTH_1("cn-north-1", "obs.cn-north-1.myhuaweicloud.com", "North China - Beijing I"), diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java index c5026949cfae63ecceb740eb68b6af54da464cdf..e63a2bc732963b1918bf6566ee3f2e3cac11dab5 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java @@ -35,8 +35,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*; public class OBSUtils { - - public static String getUrl(String region, String bucket, String objectKey) { + public static String getUrl(String region,String endpoint, String bucket, String objectKey) { if (StringUtils.isBlank(region) || StringUtils.isBlank(bucket) || StringUtils.isBlank(objectKey)) { return null; } @@ -44,7 +43,11 @@ public class OBSUtils { if (objectKey.startsWith("/")) { objectKey = objectKey.substring(1); } - return String.format("http://%s.%s/%s", bucket, OBSRegions.fromName(region).getEndpoint(), objectKey); + + if (!OBSRegions.NOT_FOUND.equals(OBSRegions.fromName(region))) { + endpoint = OBSRegions.fromName(region).getEndpoint(); + } + return String.format("http://%s.%s/%s", bucket, endpoint, objectKey); } public static ObsClient createObsClientWithAkSk(ProcessContext context, ComponentLog logger) { @@ -105,8 +108,14 @@ public class OBSUtils { if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) { config.setHttpProxy(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort(), proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword()); } + String name = context.getProperty(OBS_REGION).getValue(); - config.setEndPoint(OBSRegions.fromName(name).getEndpoint()); + OBSRegions region = OBSRegions.fromName(name); + String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue(); + if (!OBSRegions.NOT_FOUND.equals(region)) { + endpoint = region.getEndpoint(); + } + config.setEndPoint(endpoint); return config; } } diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java index 6bac6cf080152e44d7eb215b7644f8e539b3dc86..475289322894efe444d979f0359feef1a7ab50d4 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java @@ -210,6 +210,7 @@ public class PutOBSObject extends AbstractOBSProcessor { public static final List properties = Collections.unmodifiableList( Arrays.asList( OBS_REGION, + ENDPOINT_OVERRIDE_URL, BUCKET, ACCESS_KEY, SECRET_KEY, diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/model/OBSRecord.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/model/OBSRecord.java index 48c0e01075af89029eba5b3994ddaa3ad4718510..8beff7f961706590cf303166788bda2dca37a709 100644 --- a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/model/OBSRecord.java +++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/model/OBSRecord.java @@ -38,6 +38,8 @@ public class OBSRecord implements Comparable, Serializable, Listable private static final RecordSchema SCHEMA; private String region; + private String endpoint; + private VersionOrDeleteMarker versionSummary; private ObjectMetadata objectMetadata; @@ -98,7 +100,7 @@ public class OBSRecord implements Comparable, Serializable, Listable if (versionSummary == null) { return null; } - return OBSUtils.getUrl(region, versionSummary.getBucketName(), versionSummary.getObjectKey()); + return OBSUtils.getUrl(region, endpoint, versionSummary.getBucketName(), versionSummary.getObjectKey()); } @Override @@ -140,4 +142,8 @@ public class OBSRecord implements Comparable, Serializable, Listable public void setObjectMetadata(ObjectMetadata objectMetadata) { this.objectMetadata = objectMetadata; } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } } diff --git a/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/AbstractOBSIT.java b/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/AbstractOBSIT.java index 3adc08cb59d80106e691cd7f55b5104483ff1b19..eed1a0b8b8b3267d07ae0db1f5385456239c3e91 100644 --- a/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/AbstractOBSIT.java +++ b/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/AbstractOBSIT.java @@ -55,6 +55,10 @@ public abstract class AbstractOBSIT { protected final static String sk = "your sk"; protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt"; protected final static String REGION_NAME = "cn-south-1"; + + // the endpoint of REGION_NAME + protected final static String ENDPOINT = "obs." + REGION_NAME +".myhuaweicloud.com"; + // Static so multiple Tests can use same client, you may need to clean up uploaded files at the end of the method protected static ObsClient client; @@ -207,6 +211,16 @@ public abstract class AbstractOBSIT { return runner; } + protected static TestRunner initRunner(Processor processor, String endpoint) { + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(OBS_REGION, "NotFound"); + runner.setProperty(ENDPOINT_OVERRIDE_URL, endpoint); + runner.setProperty(BUCKET, bucketName); + runner.setProperty(ACCESS_KEY, ak); + runner.setProperty(SECRET_KEY, sk); + return runner; + } + protected static TestRunner initCredentialsRunner(Processor processor) throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(OBS_REGION, REGION_NAME); diff --git a/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/ITListOBSObject.java b/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/ITListOBSObject.java index 3032ccc12d728ad1ff5bef1eb794631835f41371..8098aeb5d93a348b4f172fa7f1d63696e4978597 100644 --- a/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/ITListOBSObject.java +++ b/nifi-huawei-processors/src/test/java/org/apache/nifi/processors/obs/ITListOBSObject.java @@ -68,6 +68,35 @@ public class ITListOBSObject extends AbstractOBSIT{ clearObjects(putFiles); } + @Test + public void testRegionNotFound() throws IOException, InterruptedException { + String[] putFiles = new String[]{"a","b/c","c/e","c/f","d/e"}; + + putTestFile(putFiles); + final TestRunner runner = initRunner(new ListOBSObject(), ENDPOINT); + runner.setValidateExpressionUsage(false); + sleep(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListOBSObject.REL_SUCCESS, 5); + sleep(); + // update + putTestFile(putFiles); + runner.run(); + + // repeat the cycle to ensure that all new objects are listed + // AbstractListProcessor.listByTrackingTimestamps + sleep(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListOBSObject.REL_SUCCESS, 10); + List flowFiles = runner.getFlowFilesForRelationship(ListOBSObject.REL_SUCCESS); + flowFiles.get(0).assertAttributeEquals("filename", "a"); + flowFiles.get(1).assertAttributeEquals("filename", "b/c"); + flowFiles.get(2).assertAttributeEquals("filename", "c/e"); + flowFiles.get(3).assertAttributeEquals("filename", "c/f"); + flowFiles.get(4).assertAttributeEquals("filename", "d/e"); + clearObjects(putFiles); + } + @Test public void testSimpleListUsingCredentialsProviderService() throws Throwable { String[] putFiles = new String[]{"a","b/c","d/e"};