diff --git a/nifi-huawei-nar/pom.xml b/nifi-huawei-nar/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..4f4300d46692fddc339d3a741108dbd956cbc08b
--- /dev/null
+++ b/nifi-huawei-nar/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-huawei-bundle
+ 1.18.0
+
+
+ nifi-huawei-nar
+ nar
+
+ true
+ true
+
+
+
+
+ org.apache.nifi
+ nifi-huawei-service-api-nar
+ 1.18.0
+ nar
+
+
+ org.apache.nifi
+ nifi-huawei-processors
+ 1.18.0
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+
diff --git a/nifi-huawei-nar/src/main/resources/META-INF/LICENSE b/nifi-huawei-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000000000000000000000000000000000..293a59c46bf56c8a2718e23ff1fe632191bc320e
--- /dev/null
+++ b/nifi-huawei-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,232 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
+ The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
+ under an MIT style license.
+
+ Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
diff --git a/nifi-huawei-nar/src/main/resources/META-INF/NOTICE b/nifi-huawei-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000000000000000000000000000000000..c4ff9a52e3021669e45df6f5c5b6f85b42136d84
--- /dev/null
+++ b/nifi-huawei-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,122 @@
+nifi-huawei-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache HttpComponents
+ The following NOTICE information applies:
+ Apache HttpClient
+ Copyright 1999-2014 The Apache Software Foundation
+
+ Apache HttpCore
+ Copyright 2005-2014 The Apache Software Foundation
+
+ This project contains annotations derived from JCIP-ANNOTATIONS
+ Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
+
+ (ASLv2) Joda Time
+ The following NOTICE information applies:
+ This product includes software developed by
+ Joda.org (http://www.joda.org/).
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2014 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+ (ASLv2) Apache Commons BeanUtils
+ The following NOTICE information applies:
+ Apache Commons BeanUtils
+ Copyright 2000-2016 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Amazon Web Services SDK
+ The following NOTICE information applies:
+ Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ This product includes software developed by
+ Amazon Technologies, Inc (http://www.amazon.com/).
+
+ **********************
+ THIRD PARTY COMPONENTS
+ **********************
+ This software includes third party software subject to the following copyrights:
+ - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
+ - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+
+ (ASLv2) This includes derived works from apigateway-generic-java-sdk project (https://github.com/rpgreen/apigateway-generic-java-sdk)
+ https://github.com/rpgreen/apigateway-generic-java-sdk/commit/32eea44cc855a530c9b4a28b9f3601a41bc85618 as the point reference:
+ The derived work is adapted from
+ main/ca/ryangreen/apigateway/generic/
+ GenericApiGatewayClient.java
+ GenericApiGatewayClientBuilder.java
+ GenericApiGatewayException.java
+ GenericApiGatewayRequest.java
+ GenericApiGatewayRequestBuilder.java
+ test/ca/ryangreen/apigateway/generic/
+ GenericApiGatewayClientTest.java
+ LambdaMatcher.java
+ and can be found in the directories:
+ nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/../wag/client/
+ GenericApiGatewayClient.java
+ GenericApiGatewayClientBuilder.java
+ GenericApiGatewayException.java
+ GenericApiGatewayRequest.java
+ GenericApiGatewayRequestBuilder.java
+ Validate.java
+ nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/../wag/
+ RequestMatcher.java
+ GetAWSGatewayApiTest.java
\ No newline at end of file
diff --git a/nifi-huawei-processors/pom.xml b/nifi-huawei-processors/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..491d86add4107bdee1d4f1c534cc63c25ca26a55
--- /dev/null
+++ b/nifi-huawei-processors/pom.xml
@@ -0,0 +1,116 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-huawei-bundle
+ 1.18.0
+
+
+ nifi-huawei-processors
+ jar
+
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-distributed-cache-client-service-api
+
+
+ org.apache.nifi
+ nifi-listed-entity
+ 1.18.0
+
+
+ org.apache.nifi
+ nifi-huawei-service-api
+ 1.18.0
+ provided
+
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+
+
+ org.apache.nifi
+ nifi-record
+
+
+ org.apache.nifi
+ nifi-standard-record-utils
+ 1.18.0
+
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+
+
+ com.huaweicloud
+ esdk-obs-java-bundle
+
+
+ commons-beanutils
+ commons-beanutils
+ 1.9.4
+
+
+ commons-logging
+ commons-logging
+
+
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+ org.apache.nifi
+ nifi-mock
+ 1.18.0
+ test
+
+
+ com.squareup.okhttp3
+ mockwebserver
+ test
+
+
+ org.apache.nifi
+ nifi-mock-record-utils
+ test
+
+
+ org.apache.nifi
+ nifi-record-serialization-services
+ 1.18.0
+ test
+
+
+ org.apache.nifi
+ nifi-schema-registry-service-api
+ 1.18.0
+ provided
+
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+
+
+
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractHuaweiProcessor.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractHuaweiProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..5ad676934bea867d1617d694b69e49c0397529cd
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractHuaweiProcessor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.abstractprocessor;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.proxy.ProxySpec;
+
+import java.util.*;
+
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*;
+
+/**
+ * Abstract base class for huawei processors. This class uses huawei credentials for creating obs clients
+ */
+public abstract class AbstractHuaweiProcessor extends AbstractSessionFactoryProcessor {
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("FlowFiles are routed to success relationship").build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+ .description("FlowFiles are routed to failure relationship").build();
+
+ public static final Set relationships = Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected Collection customValidate(final ValidationContext validationContext) {
+ final List validationResults = new ArrayList<>(super.customValidate(validationContext));
+
+ final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
+ final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
+ final boolean credentialsSet = validationContext.getProperty(HUAWEI_CREDENTIALS_PROVIDER_SERVICE).isSet();
+ if (!(credentialsSet || (accessKeySet && secretKeySet))) {
+ validationResults.add(new ValidationResult.Builder().input("Ak/sk/credentials").valid(false).explanation("credentials and one of the Ak/SK entries must be set").build());
+ }
+
+ final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
+ final boolean proxyPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet();
+ final boolean proxyConfigServiceSet = validationContext.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet();
+
+ if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
+ validationResults.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
+ }
+
+ final boolean proxyUserSet = validationContext.getProperty(PROXY_USERNAME).isSet();
+ final boolean proxyPwdSet = validationContext.getProperty(PROXY_PASSWORD).isSet();
+
+ if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
+ validationResults.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build());
+ }
+
+ if (proxyUserSet && !proxyHostSet) {
+ validationResults.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy Username or Proxy Password").build());
+ }
+
+ ProxyConfiguration.validateProxySpec(validationContext, validationResults, PROXY_SPECS);
+
+ if (proxyHostSet && proxyConfigServiceSet) {
+ validationResults.add(new ValidationResult.Builder().subject("Proxy Configuration Service").valid(false)
+ .explanation("Either Proxy Username and Proxy Password must be set or Proxy Configuration Service but not both").build());
+ }
+
+ return validationResults;
+ }
+
+ /*
+ * Allow optional override of onTrigger with the ProcessSessionFactory where required for huawei processors
+ *
+ * @see AbstractProcessor
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ final ProcessSession session = sessionFactory.createSession();
+ try {
+ onTrigger(context, session);
+ session.commitAsync();
+ } catch (final Throwable t) {
+ session.rollback(true);
+ throw t;
+ }
+ }
+
+ /*
+ * Default to requiring the "standard" onTrigger with a single ProcessSession
+ */
+ public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..ab10ba0a163f98b3f41299f9772974ad3ef7e06a
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/abstractprocessor/AbstractOBSProcessor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.abstractprocessor;
+
+import com.obs.services.ObsClient;
+import com.obs.services.exception.ObsException;
+import com.obs.services.model.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processors.huawei.obs.OBSRegions;
+import org.apache.nifi.processors.huawei.obs.OBSUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*;
+import static org.apache.nifi.processors.huawei.obs.Constants.*;
+
+public abstract class AbstractOBSProcessor extends AbstractHuaweiProcessor implements VerifiableProcessor {
+ protected volatile ObsClient client;
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ client = OBSUtils.createClient(context, getLogger());
+ }
+
+ @Override
+ public List verify(final ProcessContext context, final ComponentLog verificationLogger, final Map attributes) {
+ final List results = new ArrayList<>();
+
+ try {
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .verificationStepName("Create Client and Configure Region")
+ .explanation("Successfully created OBS Client and configured Region")
+ .build());
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to create OBS Client", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .verificationStepName("Create Client and Configure Region")
+ .explanation("Failed to crete OBS Client or configure Region: " + e.getMessage())
+ .build());
+ }
+ return results;
+ }
+
+ protected GranteeInterface createGrantee(final String value) {
+ if (StringUtils.isBlank(value)) {
+ return null;
+ }
+ return new CanonicalGrantee(value);
+ }
+
+ protected final List createGrantees(final String value) {
+ if (StringUtils.isBlank(value)) {
+ return Collections.emptyList();
+ }
+
+ final List grantees = new ArrayList<>();
+ final String[] values = value.split(",");
+ for (final String val : values) {
+ final String identifier = val.trim();
+ final GranteeInterface grantee = createGrantee(identifier);
+ if (grantee != null) {
+ grantees.add(grantee);
+ }
+ }
+ return grantees;
+ }
+
+ /**
+ * Create AccessControlList if appropriate properties are configured.
+ *
+ * @param context ProcessContext
+ * @param flowFile FlowFile
+ * @return AccessControlList or null if no ACL properties were specified
+ */
+ protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) {
+ AccessControlList acl = null;
+
+ final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isNotBlank(ownerId)) {
+ final Owner owner = new Owner();
+ owner.setId(ownerId);
+ if (acl == null) {
+ acl = new AccessControlList();
+ }
+ acl.setOwner(owner);
+ acl.grantPermission(new CanonicalGrantee(ownerId), Permission.PERMISSION_FULL_CONTROL);
+ }
+
+ for (final GranteeInterface grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
+ if (acl == null) {
+ acl = new AccessControlList();
+ }
+ acl.grantPermission(grantee, Permission.PERMISSION_FULL_CONTROL);
+ }
+
+ for (final GranteeInterface grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
+ if (acl == null) {
+ acl = new AccessControlList();
+ }
+ acl.grantPermission(grantee, Permission.PERMISSION_READ);
+ }
+
+ for (final GranteeInterface grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
+ if (acl == null) {
+ acl = new AccessControlList();
+ }
+ acl.grantPermission(grantee, Permission.PERMISSION_READ_ACP);
+ }
+
+ for (final GranteeInterface grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
+ if (acl == null) {
+ acl = new AccessControlList();
+ }
+ acl.grantPermission(grantee, Permission.PERMISSION_WRITE_ACP);
+ }
+
+ return acl;
+ }
+
+ protected FlowFile extractExceptionDetails(final Exception e, final ProcessSession session, FlowFile flowFile) {
+ flowFile = session.putAttribute(flowFile, OBS_EXCEPTION, e.getClass().getName());
+ if (e instanceof ObsException) {
+ flowFile = putAttribute(session, flowFile, OBS_ERROR_CODE, ((ObsException) e).getErrorCode());
+ flowFile = putAttribute(session, flowFile, OBS_ERROR_Message, ((ObsException) e).getErrorMessage());
+ flowFile = putAttribute(session, flowFile, OBS_ADDITIONAL_DETAILS, ((ObsException) e).getXmlMessage());
+ return flowFile;
+
+ }
+ return putAttribute(session, flowFile, OBS_ERROR_Message, e.getMessage());
+ }
+
+ private FlowFile putAttribute(final ProcessSession session, final FlowFile flowFile, final String key, final Object value) {
+ return (value == null) ? flowFile : session.putAttribute(flowFile, key, value.toString());
+ }
+
+ /**
+ * Create CannedAccessControlList if CANNED_ACL property specified.
+ *
+ * @param context ProcessContext
+ * @param flowFile FlowFile
+ * @return CannedAccessControlList or null if not specified
+ */
+ protected final AccessControlList createCannedACL(final ProcessContext context, final FlowFile flowFile) {
+ final String cannedAclString = context.getProperty(CANNED_ACL).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isNoneBlank(cannedAclString)) {
+ switch (cannedAclString) {
+ case "Private":
+ return AccessControlList.REST_CANNED_PRIVATE;
+ case "PublicRead":
+ return AccessControlList.REST_CANNED_PUBLIC_READ;
+ case "PublicReadWrite":
+ return AccessControlList.REST_CANNED_PUBLIC_READ_WRITE;
+ case "PublicReadWriteDelivered":
+ return AccessControlList.REST_CANNED_PUBLIC_READ_WRITE_DELIVERED;
+ default:
+ }
+ }
+ return null;
+ }
+
+
+
+
+ public ObsClient getClient() {
+ return client;
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java
new file mode 100644
index 0000000000000000000000000000000000000000..fbe7a4326135a0324f5679ec0996ce53c2ed2ebe
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/common/PropertyDescriptors.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.common;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.huawei.obs.OBSRegions;
+import org.apache.nifi.processors.huawei.credentials.provider.service.HuaweiCredentialsProviderService;
+import org.apache.nifi.processors.huawei.obs.Constants;
+import org.apache.nifi.processors.huawei.obs.ObsServiceEncryptionService;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.ssl.SSLContextService;
+
+public interface PropertyDescriptors {
+ PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
+ .name("Access Key")
+ .displayName("Access Key ID")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+ PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
+ .name("Secret Key")
+ .displayName("Secret Access Key")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+ PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+ .name("Proxy Host")
+ .description("Proxy host name or IP")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
+ .name("Proxy Host Port")
+ .description("Proxy host port")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(false)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+
+ PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder()
+ .name("proxy-user-name")
+ .displayName("Proxy Username")
+ .description("Proxy username")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder()
+ .name("proxy-user-password")
+ .displayName("Proxy Password")
+ .description("Proxy password")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+ PropertyDescriptor OBS_REGION = new PropertyDescriptor.Builder()
+ .name("OBS Region")
+ .description("If the destination Region is Not found, select Not Found and enter the Endpoint address of the destination Region in the Endpoint Override URL box")
+ .required(true)
+ .allowableValues(OBSRegions.getAvailableOBSRegions())
+ .defaultValue(OBSRegions.DEFAULT_REGION.getName())
+ .build();
+ PropertyDescriptor ENDPOINT_OVERRIDE_URL = new PropertyDescriptor.Builder()
+ .name("Endpoint Override URL")
+ .description("The endpoint specified in this option box takes effect only when the OBS Region option is Not found")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Communications Timeout")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 secs")
+ .build();
+
+ PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ /**
+ * Huawei credentials provider service
+ */
+ PropertyDescriptor HUAWEI_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
+ .name("Huawei Credentials Provider service")
+ .displayName("Huawei Credentials Provider Service")
+ .description("The Controller Service that is used to obtain huawei credentials provider")
+ .required(false)
+ .identifiesControllerService(HuaweiCredentialsProviderService.class)
+ .build();
+ PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
+ .name("delimiter")
+ .displayName("Delimiter")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .description("Specifies the character used to group the object name. " +
+ "The grouping steps are as follows: " +
+ "① First select all object names that contain this Delimiter; " +
+ "② Remove the prefix field from the object names. (If the prefix field is not specified in the request, skip this step.) " +
+ "③ Group by the string between the first character and Delimiter (later returned as CommonPrefix).")
+ .build();
+
+ PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
+ .name("prefix")
+ .displayName("Prefix")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
+ .build();
+ PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder()
+ .name("write-obs-user-metadata")
+ .displayName("Write User Metadata")
+ .description("If set to 'True', the user defined metadata associated with the OBS object will be added to FlowFile attributes/records")
+ .required(true)
+ .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
+ .defaultValue("false")
+ .build();
+ PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder()
+ .name("FullControl User List")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .description("A comma-separated list of huaweiCloud account ID's that specifies who should have Full Control for an object")
+ .defaultValue("${obs.permissions.full.users}")
+ .build();
+
+ PropertyDescriptor READ_USER_LIST = new PropertyDescriptor.Builder()
+ .name("Read Permission User List")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .description("A comma-separated list of huaweiCloud account ID's that specifies who should have Read Access for an object")
+ .defaultValue("${obs.permissions.read.users}")
+ .build();
+ PropertyDescriptor READ_ACL_LIST = new PropertyDescriptor.Builder()
+ .name("Read ACL User List")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .description("A comma-separated list of huaweiCloud account ID's that specifies who should have permissions to read the Access Control List for an object")
+ .defaultValue("${obs.permissions.readacl.users}")
+ .build();
+ PropertyDescriptor WRITE_ACL_LIST = new PropertyDescriptor.Builder()
+ .name("Write ACL User List")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .description("A comma-separated list of huaweiCloud account ID's that specifies who should have permissions to change the Access Control List for an object")
+ .defaultValue("${obs.permissions.writeacl.users}")
+ .build();
+ PropertyDescriptor CANNED_ACL = new PropertyDescriptor.Builder()
+ .name("canned-acl")
+ .displayName("Canned ACL")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .description("OBS Canned ACL for an object, one of: Private, PublicRead, PublicReadWrite, PublicReadWriteDelivered; " +
+ "will be ignored if any other ACL/permission/owner property is specified; details: https://support.huaweicloud.com/sdk-java-devg-obs/obs_21_0406.html")
+ .defaultValue("${obs.permissions.cannedacl}")
+ .build();
+
+ PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+ .name("Owner")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .description("The huaweiCloud Account ID to use for the object's owner")
+ .defaultValue("${obs.owner}")
+ .build();
+ PropertyDescriptor BUCKET = new PropertyDescriptor.Builder()
+ .name("Bucket")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+ .name("Object Key")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("${filename}")
+ .build();
+ PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
+ .name("encryption-service")
+ .displayName("Encryption Service")
+ .description("Specifies the Encryption Service Controller used to configure requests. " +
+ "PutOBSObject/FetchOBSObject: Only needs to be configured in case of Server-side SSE_KMS/SSE_C encryption")
+ .required(false)
+ .identifiesControllerService(ObsServiceEncryptionService.class)
+ .build();
+ PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(false, Constants.PROXY_SPECS);
+
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/CredentialsProviderFactory.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/CredentialsProviderFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..46ec013da2e70d07422e710766ee72150151a337
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/CredentialsProviderFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.credentials.provider.factory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import com.obs.services.IObsCredentialsProvider;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processors.huawei.credentials.provider.factory.strategies.AccessKeyPairCredentialsStrategy;
+
+
+/**
+ * Generates HuaweiCloud credentials in the form of HuaweiCredentialsProvider implementations for processors
+ * and controller services. The factory supports a number of strategies for specifying and validating
+ * HuaweiCloud credentials, interpreted as an ordered list of most-preferred to least-preferred. It also supports
+ * derived credential strategies like Assume Role, which require a primary credential as an input.
+ *
+ * Additional strategies should implement CredentialsStrategy, then be added to the strategies list in the
+ * constructor.
+ *
+ * @see org.apache.nifi.processors.huawei.credentials.provider.factory.strategies
+ */
+public class CredentialsProviderFactory {
+
+ /**
+ * Validates HuaweiCloud credential properties against the configured strategies to report any validation errors.
+ * @return Validation errors
+ */
+ public Collection validate(final ValidationContext validationContext) {
+ final CredentialsStrategy selectedStrategy = new AccessKeyPairCredentialsStrategy();
+ final ArrayList validationFailureResults = new ArrayList();
+ final Collection strategyValidationFailures = selectedStrategy.validate(validationContext);
+ if (strategyValidationFailures != null) {
+ validationFailureResults.addAll(strategyValidationFailures);
+ }
+ return validationFailureResults;
+ }
+
+ /**
+ * Produces the HuaweiCredentialsProvider according to the given property set and the strategies configured in
+ * the factory.
+ * @return HuaweiCredentialsProvider implementation
+ */
+ public IObsCredentialsProvider getCredentialsProvider(final Map properties) {
+ final CredentialsStrategy primaryStrategy = new AccessKeyPairCredentialsStrategy();
+ return primaryStrategy.getCredentialsProvider(properties);
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/CredentialsStrategy.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/CredentialsStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..410407460ae52cc58c4b7f8256e7034c5b7f7644
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/CredentialsStrategy.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.credentials.provider.factory;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.obs.services.IObsCredentialsProvider;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+
+
+/**
+ * Specifies a strategy for validating and creating HuaweiCloud credentials from a list of properties configured on a
+ * Processor, Controller Service, Reporting Service, or other component. Supports both primary credentials like
+ * default credentials or API keys and also derived credentials from Assume Role.
+ */
+public interface CredentialsStrategy {
+
+ /**
+ * Name of the strategy, suitable for displaying to a user in validation messages.
+ * @return strategy name
+ */
+ String getName();
+
+ /**
+ * Validates the properties belonging to this strategy, given the selected primary strategy. Errors may result
+ * from individually malformed properties, invalid combinations of properties, or inappropriate use of properties
+ * not consistent with the primary strategy.
+ * @return validation errors
+ */
+ Collection validate(ValidationContext validationContext);
+
+ /**
+ * Creates an HuaweiCredentialsProvider instance for this strategy, given the properties defined by the user.
+ */
+ IObsCredentialsProvider getCredentialsProvider(Map properties);
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/strategies/AbstractCredentialsStrategy.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/strategies/AbstractCredentialsStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..08fa9f2802d124cbf2a3efdf5ab4029b9a68c837
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/strategies/AbstractCredentialsStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.credentials.provider.factory.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import com.obs.services.IObsCredentialsProvider;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processors.huawei.credentials.provider.factory.CredentialsStrategy;
+
+/**
+ * Partial implementation of CredentialsStrategy to support most simple property-based strategies.
+ */
+public abstract class AbstractCredentialsStrategy implements CredentialsStrategy {
+ private final String name;
+ private final PropertyDescriptor[] requiredProperties;
+
+ public AbstractCredentialsStrategy(String name, PropertyDescriptor[] requiredProperties) {
+ this.name = name;
+ this.requiredProperties = requiredProperties;
+ }
+
+ @Override
+ public Collection validate(final ValidationContext validationContext) {
+ String requiredMessageFormat = "property %1$s must be set with %2$s";
+ Collection validationFailureResults = null;
+ for (PropertyDescriptor requiredProperty : requiredProperties) {
+ if (!validationContext.getProperty(requiredProperty).isSet()) {
+ String message = String.format(requiredMessageFormat, requiredProperty.getDisplayName(),
+ getName());
+ if (validationFailureResults == null) {
+ validationFailureResults = new ArrayList();
+ }
+ validationFailureResults.add(new ValidationResult.Builder()
+ .subject(requiredProperty.getDisplayName())
+ .valid(false)
+ .explanation(message).build());
+ }
+ }
+
+ return validationFailureResults;
+ }
+
+ public abstract IObsCredentialsProvider getCredentialsProvider(Map properties);
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/strategies/AccessKeyPairCredentialsStrategy.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/strategies/AccessKeyPairCredentialsStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..80e5d6de95bb1a34aae625a288945130457ec197
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/factory/strategies/AccessKeyPairCredentialsStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.credentials.provider.factory.strategies;
+
+import java.util.Map;
+
+import com.obs.services.BasicObsCredentialsProvider;
+import com.obs.services.IObsCredentialsProvider;
+import com.obs.services.internal.security.BasicSecurityKey;
+import com.obs.services.model.ISecurityKey;
+import org.apache.nifi.components.PropertyDescriptor;
+
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*;
+
+/**
+ * Supports HuaweiCloud credentials defined by an Access Key and Secret Key pair.
+ */
+public class AccessKeyPairCredentialsStrategy extends AbstractCredentialsStrategy {
+
+ public AccessKeyPairCredentialsStrategy() {
+ super("Access Key Pair", new PropertyDescriptor[] {
+ ACCESS_KEY,
+ SECRET_KEY
+ });
+ }
+
+ @Override
+ public IObsCredentialsProvider getCredentialsProvider(Map properties) {
+ String accessKey = properties.get(ACCESS_KEY);
+ String secretKey = properties.get(SECRET_KEY);
+ ISecurityKey securityKey = new BasicSecurityKey(accessKey, secretKey);
+ return new BasicObsCredentialsProvider(securityKey);
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/service/HuaweiCredentialsProviderControllerService.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/service/HuaweiCredentialsProviderControllerService.java
new file mode 100644
index 0000000000000000000000000000000000000000..2c41374d532248caddda75a5ad2b47d4b60b96aa
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/credentials/provider/service/HuaweiCredentialsProviderControllerService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.credentials.provider.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.obs.services.IObsCredentialsProvider;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.huawei.credentials.provider.factory.CredentialsProviderFactory;
+
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.ACCESS_KEY;
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.SECRET_KEY;
+
+
+/**
+ * Implementation of HuaweiCredentialsProviderService interface
+ *
+ * @see HuaweiCredentialsProviderService
+ */
+@CapabilityDescription("Defines credentials for Huawei Web Services processors. " +
+ "Uses default credentials without configuration. " +
+ "Default credentials support ECS instance profile/role, default user profile, environment variables, etc. " +
+ "Additional options include access key / secret key pairs, credentials file, named profile, and assume role credentials.")
+@Tags({ "huawei", "credentials","provider" })
+@Restricted(
+ restrictions = {
+ @Restriction(
+ requiredPermission = RequiredPermission.ACCESS_ENVIRONMENT_CREDENTIALS,
+ explanation = "The default configuration can read environment variables and system properties for credentials"
+ )
+ }
+)
+public class HuaweiCredentialsProviderControllerService extends AbstractControllerService implements HuaweiCredentialsProviderService {
+ private volatile IObsCredentialsProvider credentialsProvider;
+ protected final CredentialsProviderFactory credentialsProviderFactory = new CredentialsProviderFactory();
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List props = new ArrayList<>();
+ props.add(ACCESS_KEY);
+ props.add(SECRET_KEY);
+ return Collections.unmodifiableList(props);
+ }
+
+ @Override
+ public IObsCredentialsProvider getCredentialsProvider() throws ProcessException {
+ return credentialsProvider;
+ }
+
+ @Override
+ protected Collection customValidate(final ValidationContext validationContext) {
+ return credentialsProviderFactory.validate(validationContext);
+ }
+
+ @OnEnabled
+ public void onConfigured(final ConfigurationContext context) {
+ final Map evaluatedProperties = new HashMap<>(context.getProperties());
+ evaluatedProperties.keySet().forEach(propertyDescriptor -> {
+ if (propertyDescriptor.isExpressionLanguageSupported()) {
+ evaluatedProperties.put(propertyDescriptor,
+ context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue());
+ }
+ });
+ credentialsProvider = credentialsProviderFactory.getCredentialsProvider(evaluatedProperties);
+ getLogger().debug("Using credentials provider: " + credentialsProvider.getClass());
+ }
+
+ @Override
+ public String toString() {
+ return "HuaweiCredentialsProviderService[id=" + getIdentifier() + "]";
+ }
+}
\ No newline at end of file
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/AbstractListHuaweiProcessor.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/AbstractListHuaweiProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..9597d06e6cf8b05e5dcb95ed1c014fffafc0a2f2
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/AbstractListHuaweiProcessor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.huawei.obs;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListableEntity;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
+
+public abstract class AbstractListHuaweiProcessor extends AbstractListProcessor {
+ public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
+ .name("Minimum File Age")
+ .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
+ .required(true)
+ .addValidator(TIME_PERIOD_VALIDATOR)
+ .defaultValue("0 sec")
+ .build();
+
+ public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
+ .name("Maximum File Age")
+ .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+ .defaultValue("100 secs")
+ .build();
+
+ public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
+ .name("Minimum File Size")
+ .description("The minimum size that a file must be in order to be pulled")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("0 B")
+ .build();
+
+ public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
+ .name("Maximum File Size")
+ .description("The maximum size that a file can be in order to be pulled")
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ protected boolean isFileInfoMatchesWithAgeAndSize(final ProcessContext context, final long minimumTimestamp, final long lastModified, final long size) {
+ final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+ final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+ final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+
+ if (lastModified < minimumTimestamp) {
+ return false;
+ }
+ final long fileAge = System.currentTimeMillis() - lastModified;
+ if (minAge > fileAge) {
+ return false;
+ }
+ if (maxAge != null && maxAge < fileAge) {
+ return false;
+ }
+ if (minSize > size) {
+ return false;
+ }
+ if (maxSize != null && maxSize < size) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/Constants.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/Constants.java
new file mode 100644
index 0000000000000000000000000000000000000000..5ab9ed83027401d3d6df9b307c0c5884f82e8a23
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/Constants.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.huawei.obs;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.proxy.ProxySpec;
+
+public interface Constants {
+ String NULL_VERSION_ID = "null";
+ String OBS_BUCKET = "obs.bucket";
+ String OBJECT_URL = "obs.objectUrl";
+ String OBS_OBJECT = "obs.objectKey";
+ String FILENAME = CoreAttributes.FILENAME.key();
+ String OBS_LAST_MODIFIED = "obs.lastModified";
+ String OBS_OPERATION = "obs.operation";
+ String OBS_CONTENT_TYPE = "obs.contentType";
+ String OBS_CONTENT_DISPOSITION = "obs.contentDisposition";
+ String OBS_UPLOAD_ID = "obs.uploadId";
+ String OBS_VERSION = "obs.version";
+ String OBS_E_TAG = "obs.eTag";
+ String OBS_CACHE_CONTROL = "obs.cacheControl";
+ String OBS_STORAGE_CLASS = "obs.storeClass";
+ String OBS_USER_META = "obs.userMetadata";
+ String OBS_API_METHOD_ATTR_KEY = "obs.apiMethod";
+ String OBS_OWNER = "obs.owner";
+ String OBS_LENGTH = "obs.length";
+ String OBS_IS_LATEST = "obs.latest";
+ String OBS_API_METHOD_PUT_OBJECT = "PutOBSObject";
+ String OBS_API_METHOD_MULTIPART_UPLOAD = "obs.multipartUpload";
+ String OBS_SSE_ALGORITHM = "obs.algorithm";
+ String OBS_ENCRYPTION_STRATEGY = "obs.encryptionStrategy";
+ String OBS_EXPIRATION_TIME = "obs.expirationTime";
+ String OBS_ERROR_Message = "obs.errorMessage";
+ String OBS_ERROR_CODE = "obs.errorCode";
+ String OBS_STATUS_CODE = "obs.statusCode";
+ String OBS_ADDITIONAL_DETAILS = "obs.additionalDetails";
+ String OBS_EXCEPTION = "obs.exception";
+ String OBS_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload";
+ String CONTENT_DISPOSITION_ATTACHMENT = "attachment; filename=";
+ String HASH_ALGORITHM = "hash.algorithm";
+ String HASH_VALUE = "hash.value";
+
+ ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java
new file mode 100644
index 0000000000000000000000000000000000000000..3c305061fb43ada1b49d2cac7f624da90bf51038
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/DeleteOBSObject.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.obs;
+
+import com.obs.services.ObsClient;
+import com.obs.services.exception.ObsException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.huawei.abstractprocessor.AbstractOBSProcessor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*;
+
+
+@SupportsBatching
+@WritesAttributes({
+ @WritesAttribute(attribute = Constants.OBS_EXCEPTION, description = "The class name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = Constants.OBS_ADDITIONAL_DETAILS, description = "The OBS supplied detail from the failed operation"),
+ @WritesAttribute(attribute = Constants.OBS_STATUS_CODE, description = "The HTTP error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = Constants.OBS_ERROR_CODE, description = "The OBS moniker of the failed operation"),
+ @WritesAttribute(attribute = Constants.OBS_ERROR_Message, description = "The OBS exception message from the failed operation")})
+@Tags({"HuaweiCloud", "obs", "Archive", "Delete"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Deletes FlowFiles on an HuaweiCloud OBS Bucket. " +
+ "If attempting to delete a file that does not exist, FlowFile is routed to success.")
+public class DeleteOBSObject extends AbstractOBSProcessor {
+ public static final List properties = Collections.unmodifiableList(
+ Arrays.asList(OBS_REGION, ENDPOINT_OVERRIDE_URL, BUCKET, ACCESS_KEY, SECRET_KEY, HUAWEI_CREDENTIALS_PROVIDER_SERVICE, KEY, TIMEOUT,
+// SSL_CONTEXT_SERVICE,
+ PROXY_CONFIGURATION_SERVICE));
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final String region = context.getProperty(OBS_REGION).getValue();
+ final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue();
+
+ final ObsClient obsClient = getClient();
+ // Deletes a key
+ try {
+ obsClient.deleteObject(bucket, key);
+ } catch (final ObsException e) {
+ flowFile = extractExceptionDetails(e, session, flowFile);
+ getLogger().error("Failed to delete OBS Object for {}; routing to failure", flowFile, e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ getLogger().info("Successfully delete OBS Object for {} in {} millis; routing to success", flowFile, transferMillis);
+ session.getProvenanceReporter().invokeRemoteProcess(flowFile, OBSUtils.getUrl(region,endpoint, bucket, key), "Object deleted");
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java
new file mode 100644
index 0000000000000000000000000000000000000000..d8a6fe5cef8eed6d4a79cb38e34a2352308fa39a
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/FetchOBSObject.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.obs;
+
+import com.obs.services.ObsClient;
+import com.obs.services.model.GetObjectMetadataRequest;
+import com.obs.services.model.GetObjectRequest;
+import com.obs.services.model.ObjectMetadata;
+import com.obs.services.model.ObsObject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.huawei.abstractprocessor.AbstractOBSProcessor;
+
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*;
+import static org.apache.nifi.processors.huawei.obs.Constants.*;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"HuaweiCloud", "OBS", "Get", "Fetch"})
+@CapabilityDescription("Retrieves the contents of an OBS Object and writes it to the content of a FlowFile")
+@WritesAttributes({
+ @WritesAttribute(attribute = OBS_BUCKET, description = "The name of the OBS bucket"),
+ @WritesAttribute(attribute = OBS_OPERATION, description = "The name of the operator"),
+ @WritesAttribute(attribute = "path", description = "The path of the file"),
+ @WritesAttribute(attribute = "absolute.path", description = "The path of the file"),
+ @WritesAttribute(attribute = "filename", description = "The name of the file"),
+ @WritesAttribute(attribute = "mime.type", description = "If OBS provides the content type/MIME type, this attribute will hold that file"),
+ @WritesAttribute(attribute = OBS_E_TAG, description = "The ETag that can be used to see if the file has changed"),
+ @WritesAttribute(attribute = OBS_EXCEPTION, description = "The class name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = OBS_ADDITIONAL_DETAILS, description = "The OBS supplied detail from the failed operation"),
+ @WritesAttribute(attribute = OBS_STATUS_CODE, description = "The HTTP error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = OBS_ERROR_CODE, description = "The OBS moniker of the failed operation"),
+ @WritesAttribute(attribute = OBS_ERROR_Message, description = "The OBS exception message from the failed operation"),
+ @WritesAttribute(attribute = OBS_EXPIRATION_TIME, description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
+ @WritesAttribute(attribute = OBS_SSE_ALGORITHM, description = "The server side encryption algorithm of the object"),
+ @WritesAttribute(attribute = OBS_VERSION, description = "The version of the OBS object"),
+ @WritesAttribute(attribute = OBS_ENCRYPTION_STRATEGY, description = "The name of the encryption strategy that was used to store the OBS object (if it is encrypted)"),})
+public class FetchOBSObject extends AbstractOBSProcessor {
+ public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
+ .name("range-start")
+ .displayName("Range Start")
+ .description("The byte position at which to start reading from the object. An empty value or a value of " +
+ "zero will start reading at the beginning of the object.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
+ .name("range-length")
+ .displayName("Range Length")
+ .description("The number of bytes to download from the object, starting from the Range Start. An empty " +
+ "value or a value that extends beyond the end of the object will read to the end of the object.")
+ .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final List properties = Collections.unmodifiableList(
+ Arrays.asList(
+ OBS_REGION,
+ ENDPOINT_OVERRIDE_URL,
+ BUCKET,
+ KEY,
+ ACCESS_KEY,
+ SECRET_KEY,
+ HUAWEI_CREDENTIALS_PROVIDER_SERVICE,
+ ENCRYPTION_SERVICE,
+ TIMEOUT,
+ RANGE_START,
+ RANGE_LENGTH,
+ PROXY_CONFIGURATION_SERVICE));
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public List verify(ProcessContext context, ComponentLog verificationLogger, Map attributes) {
+ final List results = new ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+
+ final ObsClient client = OBSUtils.createClient(context, getLogger());
+ final GetObjectMetadataRequest request = createGetObjectMetadataRequest(context, attributes);
+
+ try {
+ final ObjectMetadata objectMetadata = client.getObjectMetadata(request);
+ final long byteCount = objectMetadata.getContentLength();
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("HEAD OBS Object")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation(String.format("Successfully performed HEAD on [%s] (%s bytes) from Bucket [%s]", key, byteCount, bucket))
+ .build());
+ } catch (final Exception e) {
+ getLogger().error(String.format("Failed to fetch [%s] from Bucket [%s]", key, bucket), e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("HEAD OBS Object")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to perform HEAD on [%s] from Bucket [%s]: %s", key, bucket, e.getMessage()))
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ getLogger().warn("FetchObs Trigger {}", System.currentTimeMillis());
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+ final Map attributes = new HashMap<>();
+
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final String region = context.getProperty(OBS_REGION).getValue();
+ final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue();
+
+ final ObsClient client = getClient();
+ final GetObjectRequest request = createGetObjectRequest(context, flowFile.getAttributes(), attributes);
+ try {
+ final ObsObject obsObject = client.getObject(request);
+ if (obsObject == null) {
+ throw new IOException("HuaweiCloud refused to execute this request.");
+ }
+ flowFile = session.importFrom(obsObject.getObjectContent(), flowFile);
+ attributes.put(OBS_OBJECT, key);
+ attributes.put(FILENAME, obsObject.getObjectKey());
+ attributes.put(OBS_OPERATION, "FETCH");
+ attributes.put(OBS_BUCKET, obsObject.getBucketName());
+ final ObjectMetadata metadata = obsObject.getMetadata();
+ if (StringUtils.isNotBlank(metadata.getContentDisposition())) {
+ final String contentDisposition = URLDecoder.decode(metadata.getContentDisposition(), StandardCharsets.UTF_8.name());
+ if (contentDisposition.equals(PutOBSObject.CONTENT_DISPOSITION_INLINE) || contentDisposition.startsWith(CONTENT_DISPOSITION_ATTACHMENT)) {
+ setFilePathAttributes(attributes, key);
+ } else {
+ setFilePathAttributes(attributes, contentDisposition);
+ }
+ }
+ if (metadata.getContentMd5() != null) {
+ attributes.put(HASH_VALUE, metadata.getContentMd5());
+ attributes.put(HASH_ALGORITHM, "MD5");
+ }
+ if (metadata.getContentType() != null) {
+ attributes.put(CoreAttributes.MIME_TYPE.key(), metadata.getContentType());
+ }
+ if (metadata.getEtag() != null) {
+ attributes.put(OBS_E_TAG, metadata.getEtag());
+ }
+ if (metadata.getExpires() != null) {
+ attributes.put(OBS_EXPIRATION_TIME, String.valueOf(metadata.getExpires()));
+ }
+ if (metadata.getAllMetadata() != null) {
+ attributes.putAll(metadata.getAllMetadata());
+ }
+ } catch (Exception ioe) {
+ flowFile = extractExceptionDetails(ioe, session, flowFile);
+ getLogger().error("Failed to retrieve OBS Object for {}; routing to failure", flowFile, ioe);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes.entrySet().stream().
+ collect(Collectors.toMap(item -> String.valueOf(item.getKey()) ,
+ val -> String.valueOf(val.getValue()))));
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ getLogger().info("Successfully retrieved OBS Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
+ session.getProvenanceReporter().fetch(flowFile, OBSUtils.getUrl(region, endpoint, bucket, key), transferMillis);
+ }
+
+ private GetObjectMetadataRequest createGetObjectMetadataRequest(final ProcessContext context, final Map attributes) {
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+ return new GetObjectMetadataRequest(bucket, key);
+ }
+
+ private GetObjectRequest createGetObjectRequest(final ProcessContext context, final Map flowFileAttributes, Map attributes) {
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFileAttributes).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFileAttributes).getValue();
+ final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFileAttributes).asDataSize(DataUnit.B).longValue() : 0L);
+ final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFileAttributes).asDataSize(DataUnit.B).longValue() : null);
+ final GetObjectRequest request = new GetObjectRequest(bucket, key);
+
+ // Since the effect of the byte range 0- is equivalent to not sending a
+ // byte range and works for both zero and non-zero length objects,
+ // the single argument setRange() only needs to be called when the
+ // first byte position is greater than zero.
+ if (rangeLength != null) {
+ request.setRangeStart(rangeStart);
+ request.setRangeEnd(rangeStart + rangeLength - 1);
+ } else if (rangeStart > 0) {
+ request.setRangeStart(rangeStart);
+ }
+
+ final ObsServiceEncryptionService encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(ObsServiceEncryptionService.class);
+ if (encryptionService != null) {
+ encryptionService.configureGetObjectRequest(request, new ObjectMetadata());
+ }
+
+ if (request.getSseCHeader() !=null && request.getSseCHeader().getSSEAlgorithm() != null) {
+ String sseAlgorithmName = request.getSseCHeader().getSSEAlgorithm().name();
+ attributes.put(OBS_SSE_ALGORITHM, sseAlgorithmName);
+ attributes.put(OBS_ENCRYPTION_STRATEGY, ObsServiceEncryptionService.STRATEGY_NAME_SSE_C);
+ }
+ return request;
+ }
+
+ protected void setFilePathAttributes(Map attributes, String filePathName) {
+ final int lastSlash = filePathName.lastIndexOf("/");
+ if (lastSlash > -1 && lastSlash < filePathName.length() - 1) {
+ attributes.put(CoreAttributes.PATH.key(), filePathName.substring(0, lastSlash));
+ attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), filePathName);
+ attributes.put(CoreAttributes.FILENAME.key(), filePathName.substring(lastSlash + 1));
+ } else {
+ attributes.put(CoreAttributes.FILENAME.key(), filePathName);
+ }
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java
new file mode 100644
index 0000000000000000000000000000000000000000..3bf50e8e25aa3561c02154ca504e092c3a94eddf
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/ListOBSObject.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.obs;
+
+import com.obs.services.ObsClient;
+import com.obs.services.model.GetObjectMetadataRequest;
+import com.obs.services.model.ListVersionsResult;
+import com.obs.services.model.ObjectMetadata;
+import com.obs.services.model.VersionOrDeleteMarker;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.list.ListableEntity;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
+import org.apache.nifi.processors.huawei.obs.model.OBSObjectBucketLister;
+import org.apache.nifi.processors.huawei.obs.model.OBSRecord;
+import org.apache.nifi.processors.huawei.obs.model.ObsBucketLister;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.huawei.obs.Constants.*;
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"Huawei", "OBS", "list"})
+@CapabilityDescription("Retrieves a listing of objects from an obs bucket. For each object that is listed, creates a FlowFile that represents "
+ + "the object so that it can be fetched in conjunction with FetchOBSObject. This Processor is designed to run on Primary Node only "
+ + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
+ + "all of the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
+ + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
+ + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
+ + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
+@WritesAttributes({
+ @WritesAttribute(attribute = OBS_BUCKET, description = "The name of the OBS bucket"),
+ @WritesAttribute(attribute = "filename", description = "The name of the file"),
+ @WritesAttribute(attribute = OBS_E_TAG, description = "The ETag that can be used to see if the file has changed"),
+ @WritesAttribute(attribute = OBS_OWNER, description = "Object owner"),
+ @WritesAttribute(attribute = OBS_IS_LATEST, description = "A boolean indicating if this is the latest version of the object"),
+ @WritesAttribute(attribute = OBS_LAST_MODIFIED, description = "The last modified time in milliseconds since epoch in UTC time"),
+ @WritesAttribute(attribute = OBS_LENGTH, description = "The size of the object in bytes"),
+ @WritesAttribute(attribute = OBS_STORAGE_CLASS, description = "The storage class of the object"),
+ @WritesAttribute(attribute = OBS_VERSION, description = "The version of the object, if applicable"),
+ @WritesAttribute(attribute = OBS_USER_META + ".__", description = "If 'Write User Metadata' is set to 'True', the user defined metadata associated to the OBS object that is being listed " +
+ "will be written as part of the flowfile attributes")})
+public class ListOBSObject extends AbstractListHuaweiProcessor {
+ public static final List properties = Collections.unmodifiableList(Arrays.asList(
+ OBS_REGION,
+ ENDPOINT_OVERRIDE_URL,
+ BUCKET,
+ ACCESS_KEY,
+ SECRET_KEY,
+ HUAWEI_CREDENTIALS_PROVIDER_SERVICE,
+ DELIMITER,
+ PREFIX,
+ LISTING_STRATEGY,
+ ListedEntityTracker.TRACKING_STATE_CACHE,
+ ListedEntityTracker.TRACKING_TIME_WINDOW,
+ ListedEntityTracker.INITIAL_LISTING_TARGET,
+ RECORD_WRITER,
+ MAX_AGE,
+ MIN_AGE,
+ MAX_SIZE,
+ MIN_SIZE,
+ WRITE_USER_METADATA,
+ TIMEOUT,
+ PROXY_CONFIGURATION_SERVICE
+ ));
+
+ public static final Set relationships = Collections.singleton(REL_SUCCESS);
+
+ protected volatile ObsClient client;
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ client = OBSUtils.createClient(context, getLogger());
+ }
+
+ @Override
+ protected Map createAttributes(ListableEntity entity, ProcessContext context) {
+ OBSRecord record = (OBSRecord) entity;
+ VersionOrDeleteMarker versionSummary = record.getVersionSummary();
+ if (versionSummary == null) {
+ return new HashMap<>();
+ }
+ final Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getObjectKey());
+ attributes.put(OBS_BUCKET, versionSummary.getBucketName());
+ if (versionSummary.getOwner() != null) {
+ attributes.put(OBS_OWNER, versionSummary.getOwner().getId());
+ }
+ attributes.put(OBS_E_TAG, versionSummary.getEtag());
+ attributes.put(OBS_LAST_MODIFIED, String.valueOf(versionSummary.getLastModified().getTime()));
+ attributes.put(OBS_LENGTH, String.valueOf(versionSummary.getSize()));
+ attributes.put(OBS_STORAGE_CLASS, versionSummary.getStorageClass());
+ attributes.put(OBS_IS_LATEST, String.valueOf(versionSummary.isLatest()));
+ if (versionSummary.getVersionId() != null) {
+ attributes.put(OBS_VERSION, versionSummary.getVersionId());
+ }
+ // Add user-defined metadata
+ if (record.getObjectMetadata() != null) {
+ for (Map.Entry e : record.getObjectMetadata().getAllMetadata().entrySet()) {
+ attributes.put(OBS_USER_META + "." + e.getKey(), (String) e.getValue());
+ }
+ }
+ return attributes;
+ }
+
+ @Override
+ protected String getPath(ProcessContext context) {
+ return context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ protected List performListing(ProcessContext context, Long minTimestamp, ListingMode listingMode) throws IOException {
+ final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
+ final String region = context.getProperty(OBS_REGION).getValue();
+ final String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue();
+ List list = new ArrayList<>();
+ int batchCount = 0;
+ ObsBucketLister bucketLister = new OBSObjectBucketLister(client, context);
+ do {
+ ListVersionsResult versionListing = bucketLister.listVersions();
+ getLogger().warn("ListVersionsResult count : {}", versionListing.getVersions().length);
+ for (VersionOrDeleteMarker versionSummary : versionListing.getVersions()) {
+ if (!isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, versionSummary.getLastModified().getTime(), versionSummary.getSize())) {
+ continue;
+ }
+ ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
+ OBSRecord record = new OBSRecord();
+ record.setObjectMetadata(objectMetadata);
+ record.setRegion(region);
+ record.setEndpoint(endpoint);
+ record.setVersionSummary(versionSummary);
+ list.add(record);
+ batchCount++;
+ }
+ bucketLister.setNextMarker();
+ getLogger().info("Successfully listed {} new files from OBS; routing to success", batchCount);
+ batchCount = 0;
+ // 是否完成
+ } while (bucketLister.isTruncated());
+ return list;
+ }
+
+ protected boolean isFileInfoMatchesWithAgeAndSize(final ProcessContext context, final long minimumTimestamp,
+ final long lastModified, final long size) {
+ final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+ final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+ final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+
+ if (lastModified < minimumTimestamp) {
+ return false;
+ }
+ final long fileAge = System.currentTimeMillis() - lastModified;
+ if (minAge > fileAge) {
+ return false;
+ }
+ if (maxAge != null && maxAge < fileAge) {
+ return false;
+ }
+ if (minSize > size) {
+ return false;
+ }
+ return maxSize == null || maxSize >= size;
+ }
+
+ @Override
+ protected boolean isListingResetNecessary(PropertyDescriptor property) {
+ return BUCKET.equals(property)
+ || OBS_REGION.equals(property);
+ }
+
+ @Override
+ protected Scope getStateScope(PropertyContext context) {
+ return null;
+ }
+
+ @Override
+ protected RecordSchema getRecordSchema() {
+ return null;
+ }
+
+ @Override
+ protected Integer countUnfilteredListing(ProcessContext context) throws IOException {
+ return null;
+ }
+
+ @Override
+ protected String getListingContainerName(ProcessContext context) {
+ return null;
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+ private ObjectMetadata getObjectMetadata(ProcessContext context, ObsClient client, VersionOrDeleteMarker versionSummary) {
+ ObjectMetadata objectMetadata = null;
+ if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+ try {
+ GetObjectMetadataRequest request = new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey());
+ objectMetadata = client.getObjectMetadata(request);
+ } catch (final Exception e) {
+ getLogger().warn("Failed to obtain User Metadata for Obs Object {} in bucket {}. Will list Obs Object without the user metadata",
+ new Object[]{versionSummary.getKey(), versionSummary.getBucketName()}, e);
+ }
+ }
+ return objectMetadata;
+ }
+}
+
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java
new file mode 100644
index 0000000000000000000000000000000000000000..211ad06b81da2a700180b25be2a60d841949546d
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSRegions.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.huawei.obs;
+
+import org.apache.nifi.components.AllowableValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public enum OBSRegions {
+ NOT_FOUND("NotFound", "NotFound", "Not found"),
+ OF_SOUTH_1("af-south-1", "obs.af-south-1.myhuaweicloud.com", "Africa - Johannesburg"),
+ CN_NORTH_4("cn-north-4", "obs.cn-north-4.myhuaweicloud.com", "North China - Beijing IV"),
+ CN_NORTH_1("cn-north-1", "obs.cn-north-1.myhuaweicloud.com", "North China - Beijing I"),
+ CN_NORTH_9("cn-north-9", "obs.cn-north-9.myhuaweicloud.com", "North China - Ulanqabu I"),
+ CN_EAST_2("cn-east-2", "obs.cn-east-2.myhuaweicloud.com", "East China - Shanghai II"),
+ CN_EAST_3("cn-east-3", "obs.cn-east-3.myhuaweicloud.com", "East China - Shanghai I"),
+ CN_SOUTH_1("cn-south-1", "obs.cn-south-1.myhuaweicloud.com", "South China - Guangzhou"),
+ LA_NORTH_2("la-north-2", "obs.la-north-2.myhuaweicloud.com", "Latin America - Mexico City II"),
+ NA_MEXICO_1("na-mexico-1", "obs.na-mexico-1.myhuaweicloud.com", "Latin America - Mexico City I"),
+ SA_BRAZIL_1("sa-brazil-1", "obs.sa-brazil-1.myhuaweicloud.com", "Latin America - Sao Paulo I"),
+ CN_SOUTHWEST_2("cn-southwest-2", "obs.cn-southwest-2.myhuaweicloud.com", "Southwest - Guiyang I"),
+ AP_SOUTHWEST_2("ap-southeast-2", "obs.ap-southeast-2.myhuaweicloud.com", "Asia Pacific - Bangkok"),
+ AP_SOUTHWEST_3("ap-southeast-3", "obs.ap-southeast-3.myhuaweicloud.com", "Asia Pacific - Singapore"),
+ AP_SOUTHWEST_1("ap-southeast-1", "obs.ap-southeast-1.myhuaweicloud.com", "China - Hong Kong");
+
+ public static final OBSRegions DEFAULT_REGION = CN_NORTH_1;
+ private final String name;
+
+ private final String endpoint;
+
+ private final String description;
+
+ OBSRegions(String name, String endpoint, String description) {
+ this.name = name;
+ this.endpoint = endpoint;
+ this.description = description;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getEndpoint() {
+ return this.endpoint;
+ }
+
+ public String getDescription() {
+ return this.description;
+ }
+
+ public static OBSRegions fromName(String regionName) {
+ OBSRegions[] var1 = values();
+ int var2 = var1.length;
+
+ for (int var3 = 0; var3 < var2; ++var3) {
+ OBSRegions region = var1[var3];
+ if (region.getName().equals(regionName)) {
+ return region;
+ }
+ }
+
+ throw new IllegalArgumentException("Cannot create enum from " + regionName + " value!");
+ }
+
+ public static AllowableValue[] getAvailableOBSRegions() {
+ final List values = new ArrayList<>();
+ for (final OBSRegions region : OBSRegions.values()) {
+ AllowableValue allowableValue = new AllowableValue(region.getName(), region.getDescription(), "Huawei Obs Region Code : " + region.getName());
+ values.add(allowableValue);
+ }
+ return values.toArray(new AllowableValue[0]);
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..e63a2bc732963b1918bf6566ee3f2e3cac11dab5
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/OBSUtils.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.huawei.obs;
+
+import com.obs.services.ObsClient;
+import com.obs.services.ObsConfiguration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.huawei.credentials.provider.service.HuaweiCredentialsProviderService;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.net.Proxy;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*;
+
+public class OBSUtils {
+ public static String getUrl(String region,String endpoint, String bucket, String objectKey) {
+ if (StringUtils.isBlank(region) || StringUtils.isBlank(bucket) || StringUtils.isBlank(objectKey)) {
+ return null;
+ }
+
+ if (objectKey.startsWith("/")) {
+ objectKey = objectKey.substring(1);
+ }
+
+ if (!OBSRegions.NOT_FOUND.equals(OBSRegions.fromName(region))) {
+ endpoint = OBSRegions.fromName(region).getEndpoint();
+ }
+ return String.format("http://%s.%s/%s", bucket, endpoint, objectKey);
+ }
+
+ public static ObsClient createObsClientWithAkSk(ProcessContext context, ComponentLog logger) {
+ final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
+ final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
+ ObsClient obsClient = new ObsClient(accessKey, secretKey, createConfiguration(context));
+ if (logger != null) {
+ logger.warn("create ObsClient success");
+ }
+ return obsClient;
+ }
+
+ /**
+ * Attempts to create the client using the controller service first before falling back to the standard configuration.
+ *
+ * @param context The process context
+ * @return The created client
+ */
+ public static ObsClient createClient(final ProcessContext context, ComponentLog logger) {
+ final ControllerService service = context.getProperty(HUAWEI_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
+ if (service != null) {
+ logger.debug("Using Huawei credentials provider service for creating client");
+ final HuaweiCredentialsProviderService huaweiCredentialsProviderService =
+ context.getProperty(HUAWEI_CREDENTIALS_PROVIDER_SERVICE).asControllerService(HuaweiCredentialsProviderService.class);
+ return new ObsClient(huaweiCredentialsProviderService.getCredentialsProvider(), createConfiguration(context));
+ } else {
+ logger.debug("Using Huawei credentials for creating client");
+ return createObsClientWithAkSk(context, logger);
+ }
+ }
+
+ private static ObsConfiguration createConfiguration(final ProcessContext context) {
+ final ObsConfiguration config = new ObsConfiguration();
+
+ config.setMaxConnections(context.getMaxConcurrentTasks());
+ config.setMaxErrorRetry(0);
+ // If this is changed to be a property, ensure other uses are also changed
+ final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ config.setConnectionTimeout(commsTimeout);
+ config.setSocketTimeout(commsTimeout);
+ // not set
+ if(context.getProperty(SSL_CONTEXT_SERVICE).isSet()) {
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService != null) {
+ final SSLContext sslContext = sslContextService.createContext();
+ config.setSslProvider(sslContext.getProvider().getName());
+ }
+ }
+
+ final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
+ if (context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet()) {
+ final ProxyConfigurationService configurationService = context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+ return configurationService.getConfiguration();
+ }
+ return ProxyConfiguration.DIRECT_CONFIGURATION;
+ });
+
+ if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+ config.setHttpProxy(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort(), proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
+ }
+
+ String name = context.getProperty(OBS_REGION).getValue();
+ OBSRegions region = OBSRegions.fromName(name);
+ String endpoint = context.getProperty(ENDPOINT_OVERRIDE_URL).getValue();
+ if (!OBSRegions.NOT_FOUND.equals(region)) {
+ endpoint = region.getEndpoint();
+ }
+ config.setEndPoint(endpoint);
+ return config;
+ }
+}
diff --git a/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java
new file mode 100644
index 0000000000000000000000000000000000000000..475289322894efe444d979f0359feef1a7ab50d4
--- /dev/null
+++ b/nifi-huawei-processors/src/main/java/org/apache/nifi/processors/huawei/obs/PutOBSObject.java
@@ -0,0 +1,1158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.huawei.obs;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.obs.services.ObsClient;
+import com.obs.services.exception.ObsException;
+import com.obs.services.model.*;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.huawei.abstractprocessor.AbstractOBSProcessor;
+import org.apache.nifi.processors.huawei.obs.encryption.StandardOBSEncryptionService;
+
+import java.io.*;
+import java.net.URLEncoder;
+import java.nio.file.Files;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.huawei.common.PropertyDescriptors.*;
+import static org.apache.nifi.processors.huawei.obs.Constants.*;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"HuaweiCloud", "OBS", "Archive", "Put"})
+@CapabilityDescription("Puts FlowFiles to an OBS Bucket.\n" +
+ "The upload uses either the PutOBSObject method or the PutOBSMultipartUpload method. The PutOBSObject method " +
+ "sends the file in a single synchronous call, but it has a 5GB size limit. Larger files are sent using the " +
+ "PutOBSMultipartUpload method. This multipart process " +
+ "saves state after each step so that a large upload can be resumed with minimal loss if the processor or " +
+ "cluster is stopped and restarted.\n" +
+ "A multipart upload consists of three steps:\n" +
+ " 1) initiate upload,\n" +
+ " 2) upload the parts, and\n" +
+ " 3) complete the upload.\n" +
+ "For multipart uploads, the processor saves state locally tracking the upload ID and parts uploaded, which " +
+ "must both be provided to complete the upload.\n" +
+ "The HuaweiCloud libraries select an endpoint URL based on the HuaweiCloud region, but this can be overridden with the " +
+ "'Endpoint Override URL' property for use with other OBS-compatible endpoints.\n" +
+ "The OBS API specifies that the maximum file size for a PutOBSObject upload is 5GB. It also requires that " +
+ "parts in a multipart upload must be at least 5MB in size, except for the last part. These limits " +
+ "establish the bounds for the Multipart Upload Threshold and Part Size properties.")
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the OBS Object",
+ value = "The value of a User-Defined Metadata field to add to the OBS Object",
+ description = "Allows user-defined metadata to be added to the OBS object as key/value pairs",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the OBS object")
+@WritesAttributes({
+ @WritesAttribute(attribute = OBS_BUCKET, description = "The OBS bucket where the Object was put in OBS"),
+ @WritesAttribute(attribute = OBS_OBJECT, description = "The OBS key within where the Object was put in OBS"),
+ @WritesAttribute(attribute = OBS_CONTENT_TYPE, description = "The OBS content type of the OBS Object that put in OBS"),
+ @WritesAttribute(attribute = OBS_VERSION, description = "The version of the OBS Object that was put to OBS"),
+ @WritesAttribute(attribute = OBS_EXCEPTION, description = "The class name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = OBS_ADDITIONAL_DETAILS, description = "The OBS supplied detail from the failed operation"),
+ @WritesAttribute(attribute = OBS_STATUS_CODE, description = "The HTTP error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = OBS_ERROR_CODE, description = "The OBS moniker of the failed operation"),
+ @WritesAttribute(attribute = OBS_ERROR_Message, description = "The OBS exception message from the failed operation"),
+ @WritesAttribute(attribute = OBS_E_TAG, description = "The ETag of the OBS Object"),
+ @WritesAttribute(attribute = OBS_CONTENT_DISPOSITION, description = "The content disposition of the OBS Object that put in OBS"),
+ @WritesAttribute(attribute = OBS_CACHE_CONTROL, description = "The cache-control header of the OBS Object"),
+ @WritesAttribute(attribute = OBS_UPLOAD_ID, description = "The uploadId used to upload the Object to OBS"),
+ @WritesAttribute(attribute = OBS_EXPIRATION_TIME, description = "A human-readable form of the expiration date of " +
+ "the OBS object, if one is set"),
+ @WritesAttribute(attribute = OBS_SSE_ALGORITHM, description = "The server side encryption algorithm of the object"),
+ @WritesAttribute(attribute = OBS_USER_META, description = "A human-readable form of the User Metadata of " +
+ "the OBS object, if any was set"),
+ @WritesAttribute(attribute = OBS_ENCRYPTION_STRATEGY, description = "The name of the encryption strategy, if any was set"),})
+public class PutOBSObject extends AbstractOBSProcessor {
+ public static final long MIN_PART_SIZE = 50L * 1024L * 1024L;
+ public static final long MAX_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
+ public static final String CONTENT_DISPOSITION_INLINE = "inline";
+ public static final String CONTENT_DISPOSITION_ATTACHMENT = "attachment";
+
+ public static final String CONTENT_DISPOSITION_FILENAME = "filename";
+
+ private static final Set STORAGE_CLASSES = Collections.unmodifiableSortedSet(new TreeSet<>(
+ Arrays.stream(StorageClassEnum.values()).map(StorageClassEnum::getCode).collect(Collectors.toSet())
+ ));
+
+ public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+ .name("Content Type")
+ .displayName("Content Type")
+ .description("Sets the Content-Type HTTP header indicating the type of content stored in the associated " +
+ "object. The value of this header is a standard MIME type.\n" +
+ "OBS Java client will attempt to determine the correct content type if one hasn't been set" +
+ " yet. Users are responsible for ensuring a suitable content type is set when uploading streams. If " +
+ "no content type is provided and cannot be determined by the filename, the default content type " +
+ "\"application/octet-stream\" will be used.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CONTENT_DISPOSITION = new PropertyDescriptor.Builder()
+ .name("Content Disposition")
+ .displayName("Content Disposition")
+ .description("Sets the Content-Disposition HTTP header indicating if the content is intended to be displayed inline or should be downloaded.\n " +
+ "Possible values are 'inline' or 'attachment' or 'filename'. If this property is not specified, object's content-disposition will be set to null. " +
+ "when filename is selected, object's content-disposition will be set to filename" +
+ "When 'attachment' is selected, '; filename=' plus object key are automatically appended to form final value 'attachment; filename=\"filename.jpg\"'.")
+ .required(false)
+ .allowableValues(CONTENT_DISPOSITION_INLINE, CONTENT_DISPOSITION_ATTACHMENT, CONTENT_DISPOSITION_FILENAME)
+ .build();
+
+ public static final PropertyDescriptor CACHE_CONTROL = new PropertyDescriptor.Builder()
+ .name("Cache Control")
+ .displayName("Cache Control")
+ .description("Sets the Cache-Control HTTP header indicating the caching directives of the associated object. Multiple directives are comma-separated.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
+ .name("Storage Class")
+ .required(true)
+ .allowableValues(STORAGE_CLASSES)
+ .defaultValue(StorageClassEnum.STANDARD.getCode())
+ .build();
+
+ public static final PropertyDescriptor MULTIPART_THRESHOLD = new PropertyDescriptor.Builder()
+ .name("Multipart Threshold")
+ .description("Specifies the file size threshold for switch from the PutOBSObject API to the " +
+ "PutOBSMultipartUpload API. Flow files bigger than this limit will be sent using the stateful " +
+ "multipart process.\n" +
+ "The valid range is 50MB to 5GB.")
+ .required(true)
+ .defaultValue("5 GB")
+ .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_PART_SIZE, MAX_PUTOBJECT_SIZE))
+ .build();
+
+ public static final PropertyDescriptor MULTIPART_PART_SIZE = new PropertyDescriptor.Builder()
+ .name("Multipart Part Size")
+ .description("Specifies the part size for use when the PutOBSMultipart Upload API is used.\n" +
+ "Flow files will be broken into chunks of this size for the upload process, but the last part " +
+ "sent can be smaller since it is not padded.\n" +
+ "The valid range is 50MB to 5GB.")
+ .required(true)
+ .defaultValue("5 GB")
+ .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_PART_SIZE, MAX_PUTOBJECT_SIZE))
+ .build();
+
+ public static final PropertyDescriptor MULTIPART_AGEOFF_INTERVAL = new PropertyDescriptor.Builder()
+ .name("Multipart Upload AgeOff Interval")
+ .description("Specifies the interval at which existing multipart uploads in HuaweiCloud OBS will be evaluated " +
+ "for ageoff. When processor is triggered it will initiate the ageoff evaluation if this interval has been " +
+ "exceeded.")
+ .required(true)
+ .defaultValue("60 min")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MULTIPART_MAX_AGE = new PropertyDescriptor.Builder()
+ .name("Multipart Upload Max Age Threshold")
+ .description("Specifies the maximum age for existing multipart uploads in HuaweiCloud OBS. When the ageoff " +
+ "process occurs, any upload older than this threshold will be aborted.")
+ .required(true)
+ .defaultValue("7 days")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MULTIPART_TEMP_DIR = new PropertyDescriptor.Builder()
+ .name("obs-temporary-directory-multipart")
+ .displayName("Temporary Directory Multipart State")
+ .description("Directory in which, for multipart uploads, the processor will locally save the state tracking the upload ID and parts "
+ + "uploaded which must both be provided to complete the upload.")
+ .required(true)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .defaultValue("${java.io.tmpdir}")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final List properties = Collections.unmodifiableList(
+ Arrays.asList(
+ OBS_REGION,
+ ENDPOINT_OVERRIDE_URL,
+ BUCKET,
+ ACCESS_KEY,
+ SECRET_KEY,
+ HUAWEI_CREDENTIALS_PROVIDER_SERVICE,
+ KEY,
+ STORAGE_CLASS,
+ CONTENT_TYPE,
+ CONTENT_DISPOSITION,
+ CACHE_CONTROL,
+ TIMEOUT,
+ OWNER,
+ READ_USER_LIST,
+ FULL_CONTROL_USER_LIST,
+ READ_ACL_LIST,
+ WRITE_ACL_LIST,
+ CANNED_ACL,
+ MULTIPART_THRESHOLD,
+ MULTIPART_PART_SIZE,
+ MULTIPART_AGEOFF_INTERVAL,
+ MULTIPART_MAX_AGE,
+ MULTIPART_TEMP_DIR,
+ ENCRYPTION_SERVICE,
+ PROXY_CONFIGURATION_SERVICE
+ ));
+
+ private volatile String tempDirMultipart = System.getProperty("java.io.tmpdir");
+
+ @OnScheduled
+ public void setTempDir(final ProcessContext context) {
+ this.tempDirMultipart = context.getProperty(MULTIPART_TEMP_DIR).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ public List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ try {
+ final long startNanos = System.nanoTime();
+ CustomProperties customProperties = new CustomProperties();
+ customProperties.setBucket(context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue());
+ customProperties.setObjectKey(context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue());
+ customProperties.setCacheKey(getIdentifier() + "/" + customProperties.getBucket() + "/" + customProperties.getObjectKey());
+ customProperties.setFileName(flowFile.getAttributes().get(CoreAttributes.FILENAME.key()));
+ customProperties.setMultipartThreshold(context.getProperty(MULTIPART_THRESHOLD).asDataSize(DataUnit.B).longValue());
+ customProperties.setMultipartPartSize(context.getProperty(MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue());
+ customProperties.setContentDisposition(context.getProperty(CONTENT_DISPOSITION).getValue());
+ customProperties.setContentType(context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue());
+ customProperties.setCacheControl(context.getProperty(CACHE_CONTROL).evaluateAttributeExpressions(flowFile).getValue());
+ customProperties.setEncryptionService(context.getProperty(ENCRYPTION_SERVICE).asControllerService(ObsServiceEncryptionService.class));
+ if (customProperties.getEncryptionService() == null) {
+ customProperties.setEncryptionService(new StandardOBSEncryptionService());
+ }
+ customProperties.setStorageClass(StorageClassEnum.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ customProperties.setFlowFile(flowFile);
+
+ final Map attributes = new HashMap<>();
+ attributes.put(OBS_BUCKET, customProperties.getBucket());
+ attributes.put(OBS_OBJECT, customProperties.getObjectKey());
+
+ final long now = System.currentTimeMillis();
+
+ ageOffUploads(context, client, now, customProperties.getBucket());
+ session.read(customProperties.getFlowFile(), new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ attributes.putAll(doUpload(customProperties, rawIn, client, context));
+ }
+ });
+
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, attributes.get(OBJECT_URL), millis);
+ getLogger().info("Successfully put {} to OBS in {} milliseconds", customProperties.getFlowFile(), millis);
+ try {
+ removeLocalState(customProperties.getCacheKey());
+ } catch (IOException e) {
+ getLogger().info("Error trying to delete key {} from cache: {}",
+ customProperties.getCacheKey(), e.getMessage());
+ }
+ } catch (final ProcessException | ObsException pe) {
+ extractExceptionDetails(pe, session, flowFile);
+ getLogger().error("Failed to put {} to Huawei OBS due to {}", new Object[]{flowFile, pe});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ public File getPersistenceFile() {
+ return new File(this.tempDirMultipart + File.separator + getIdentifier());
+ }
+
+ protected boolean localUploadExistsInOBS(final ObsClient obsClient, final String bucket, final MultipartState localState) {
+ ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
+ MultipartUploadListing listing = obsClient.listMultipartUploads(listRequest);
+ for (MultipartUpload upload : listing.getMultipartTaskList()) {
+ if (upload.getUploadId().equals(localState.getUploadId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public synchronized MultipartState getLocalStateIfInOBS(final ObsClient obsClient, final String bucket,
+ final String objectKey) throws IOException {
+ MultipartState currState = getLocalState(objectKey);
+ if (currState == null) {
+ return null;
+ }
+
+ if (localUploadExistsInOBS(obsClient, bucket, currState)) {
+ getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
+ objectKey, currState.getUploadId(), currState.getPartETags().size());
+ return currState;
+ } else {
+ getLogger().info("Local state for {} with uploadId {} does not exist in OBS, deleting local state",
+ objectKey, currState.getUploadId());
+ persistLocalState(objectKey, null);
+ return null;
+ }
+ }
+
+ protected synchronized MultipartState getLocalState(final String objectKey) throws IOException {
+ // get local state if it exists
+ final File persistenceFile = getPersistenceFile();
+
+ if (persistenceFile.exists()) {
+ final Properties props = new Properties();
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ props.load(fis);
+ } catch (IOException ioe) {
+ getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
+ "restarting upload.", objectKey, ioe.getMessage());
+ return null;
+ }
+ if (props.containsKey(objectKey)) {
+ final String localSerialState = props.getProperty(objectKey);
+ if (localSerialState != null) {
+ try {
+ return MultipartState.newMultipartState(localSerialState);
+ } catch (final RuntimeException rte) {
+ getLogger().warn("Failed to recover local state for {} due to corrupt data in state.", objectKey, rte.getMessage());
+ return null;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public synchronized void persistLocalState(final String objectKey, final MultipartState currState) throws IOException {
+ JsonMapper mapper = new JsonMapper();
+ final String currStateStr = (currState == null) ? null : mapper.writeValueAsString(currState);
+ final File persistenceFile = getPersistenceFile();
+ final File parentDir = persistenceFile.getParentFile();
+ if (!parentDir.exists() && !parentDir.mkdirs()) {
+ throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " +
+ "could not be created.");
+ }
+ final Properties props = new Properties();
+ if (persistenceFile.exists()) {
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ props.load(fis);
+ }
+ }
+ if (currStateStr != null) {
+ currState.setTimestamp(System.currentTimeMillis());
+ props.setProperty(objectKey, currStateStr);
+ } else {
+ props.remove(objectKey);
+ }
+
+ if (props.size() > 0) {
+ try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+ props.store(fos, null);
+ } catch (IOException ioe) {
+ getLogger().error("Could not store state {} due to {}.",
+ persistenceFile.getAbsolutePath(), ioe.getMessage());
+ }
+ } else {
+ if (persistenceFile.exists()) {
+ try {
+ Files.delete(persistenceFile.toPath());
+ } catch (IOException ioe) {
+ getLogger().error("Could not remove state file {} due to {}.",
+ persistenceFile.getAbsolutePath(), ioe.getMessage());
+ }
+ }
+ }
+ }
+
+ protected synchronized void removeLocalState(final String objectKey) throws IOException {
+ persistLocalState(objectKey, null);
+ }
+
+ // Clear the multi-segment upload status information in the local cache
+ private synchronized void ageOffLocalState(long ageCutoff) {
+ // get local state if it exists
+ final File persistenceFile = getPersistenceFile();
+ if (!persistenceFile.exists()) {
+ return;
+ }
+ Properties props = new Properties();
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ props.load(fis);
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to ageoff remove local state due to {}",
+ ioe.getMessage());
+ return;
+ }
+ for (Entry