Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/io/cdap/plugin/http/common/http/OAuthUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public static AccessToken getAccessToken(BaseHttpConfig config) throws IOExcepti
// get accessToken from service account
return OAuthUtil.getAccessTokenByServiceAccount(config);
case OAUTH2:
if (config instanceof BaseHttpSourceConfig) {
try (CloseableHttpClient client = HttpClients.custom()
.setSSLSocketFactory(new SSLConnectionSocketFactoryCreator((BaseHttpSourceConfig) config).create())
.build()) {
return getAccessToken(client, config);
}
}
try (CloseableHttpClient client = HttpClients.createDefault()) {
return getAccessToken(client, config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.http.common.http.AuthType;
import io.cdap.plugin.http.common.http.HttpClient;
import io.cdap.plugin.http.common.http.KeyStoreType;
import io.cdap.plugin.http.common.http.OAuthUtil;
import io.cdap.plugin.http.common.http.SSLConnectionSocketFactoryCreator;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
Expand Down Expand Up @@ -70,7 +72,8 @@ private void validateOAuth2Credentials(FailureCollector collector) {
!containsMacro(PROPERTY_PROXY_PASSWORD) && !containsMacro(PROPERTY_PROXY_USERNAME) &&
!containsMacro(PROPERTY_PROXY_URL) && !containsMacro(PROPERTY_OAUTH2_CLIENT_AUTHENTICATION) &&
!containsMacro(PROPERTY_OAUTH2_GRANT_TYPE)) {
HttpClientBuilder httpclientBuilder = HttpClients.custom();
HttpClientBuilder httpclientBuilder = HttpClients.custom()
.setSSLSocketFactory(new SSLConnectionSocketFactoryCreator(this).create());
if (!Strings.isNullOrEmpty(getProxyUrl())) {
HttpHost proxyHost = HttpHost.create(getProxyUrl());
if (!Strings.isNullOrEmpty(getProxyUsername()) && !Strings.isNullOrEmpty(getProxyPassword())) {
Expand Down Expand Up @@ -140,6 +143,7 @@ private HttpBatchSourceConfig(HttpBatchSourceConfigBuilder builder) {
this.readTimeout = builder.readTimeout;
this.paginationType = builder.paginationType;
this.verifyHttps = builder.verifyHttps;
this.keystoreType = builder.keystoreType;
this.authType = builder.authType;
this.authUrl = builder.authUrl;
this.clientId = builder.clientId;
Expand Down Expand Up @@ -180,6 +184,7 @@ public static class HttpBatchSourceConfigBuilder {
private Integer readTimeout;
private String paginationType;
private String verifyHttps;
private String keystoreType;
private String authType;
private String authUrl;
private String tokenUrl;
Expand Down Expand Up @@ -345,6 +350,11 @@ public HttpBatchSourceConfigBuilder setAuthType(String authType) {
return this;
}

public HttpBatchSourceConfigBuilder setKeystoreType(KeyStoreType keystoreTypeObj) {
this.keystoreType = keystoreTypeObj.getValue();
return this;
}

public HttpBatchSourceConfig build() {
return new HttpBatchSourceConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class HttpStreamingSourceETLTest extends HttpSourceETLTest {
private static final Logger LOG = LoggerFactory.getLogger(HttpStreamingSourceETLTest.class);
private static final ArtifactId APP_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-streams", "1.0.0");
private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-streams", "1.0.0");
private static final int WAIT_FOR_RECORDS_TIMEOUT_SECONDS = 60;
private static final long WAIT_FOR_RECORDS_POLLING_INTERVAL_MS = 100;
private static final int WAIT_FOR_RECORDS_TIMEOUT_SECONDS = 120;
private static final long WAIT_FOR_RECORDS_POLLING_INTERVAL_MS = 200;

@BeforeClass
public static void setupTest() throws Exception {
Expand Down Expand Up @@ -113,6 +113,7 @@ private List<StructuredRecord> waitForRecords(ProgramManager programManager,
.atMost(WAIT_FOR_RECORDS_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.pollInterval(WAIT_FOR_RECORDS_POLLING_INTERVAL_MS, TimeUnit.MILLISECONDS)
.untilAsserted((() -> {
outputManager.get();
int recordsCount = MockSink.readOutput(outputManager).size();
Assert.assertTrue(
String.format("At least %d records expected, but %d found", exceptedNumberOfRecords, recordsCount),
Expand All @@ -122,6 +123,7 @@ private List<StructuredRecord> waitForRecords(ProgramManager programManager,
programManager.stop();
programManager.waitForStopped(10, TimeUnit.SECONDS);

outputManager.get();
return MockSink.readOutput(outputManager);
}

Expand Down
Loading
Loading