diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 9b65f2f452..5fd2da63a4 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -67,6 +67,8 @@ import org.apache.tez.dag.app.DAGAppMaster; import org.apache.tez.dag.app.DAGAppMasterState; import org.apache.tez.dag.app.LocalDAGAppMaster; +import org.apache.tez.dag.app.LocalNodeContext; +import org.apache.tez.dag.app.NodeContext; import org.apache.tez.dag.app.dag.DAG; import com.google.common.annotations.VisibleForTesting; @@ -369,10 +371,11 @@ public void run() { long appSubmitTime = System.currentTimeMillis(); dagAppMaster = - createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, + createDAGAppMaster(applicationAttemptId, cId, SystemClock.getInstance(), appSubmitTime, isSession, userDir.toUri().getPath(), new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()}, - amCredentials, UserGroupInformation.getCurrentUser().getShortUserName()); + amCredentials, UserGroupInformation.getCurrentUser().getShortUserName(), + new LocalNodeContext(currentHost, nmPort, nmHttpPort)); DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf); clientHandler = new DAGClientHandler(dagAppMaster); ((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop(); @@ -395,14 +398,19 @@ public void run() { // this can be overridden by test code to create a mock app @VisibleForTesting - protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId, - ContainerId cId, String currentHost, int nmPort, - int nmHttpPort, - Clock clock, long appSubmitTime, boolean isSession, - String userDir, - String[] localDirs, String[] logDirs, - Credentials credentials, String jobUserName) throws - IOException { + protected DAGAppMaster createDAGAppMaster( + ApplicationAttemptId applicationAttemptId, + ContainerId cId, + Clock clock, + long appSubmitTime, + boolean isSession, + String userDir, + String[] localDirs, + String[] logDirs, + Credentials credentials, + String jobUserName, + NodeContext nodeContext) + throws IOException { // Read in additional information about external services AMPluginDescriptorProto amPluginDescriptorProto = @@ -410,12 +418,12 @@ protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemp .getAmPluginDescriptor(); return isLocalWithoutNetwork - ? new LocalDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, + ? new LocalDAGAppMaster(applicationAttemptId, cId, SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs, - versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto) - : new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, + versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext) + : new DAGAppMaster(applicationAttemptId, cId, SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs, - versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto); + versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext); } @Override diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index a8b76204bd..569c5dfb9c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -209,16 +209,16 @@ * The state machine is encapsulated in the implementation of Job interface. * All state changes happens via Job interface. Each event * results in a Finite State Transition in Job. - * + *
* Tez DAG AppMaster is the composition of loosely coupled services. The services * interact with each other via events. The components resembles the * Actors model. The component acts on received event and send out the * events to other components. * This keeps it highly concurrent with no or minimal synchronization needs. - * + *
* The events are dispatched by a central Dispatch mechanism. All components * register to the Dispatcher. - * + *
* The information is shared across different components using AppContext. */ @@ -245,9 +245,7 @@ public class DAGAppMaster extends AbstractService { private String appName; private final ApplicationAttemptId appAttemptID; private final ContainerId containerID; - private final String nmHost; - private final int nmPort; - private final int nmHttpPort; + private String nmHost; private final String workingDirectory; private final String[] localDirs; private final String[] logDirs; @@ -309,6 +307,7 @@ public class DAGAppMaster extends AbstractService { private ListeningExecutorService execService; private final PluginManager pluginManager; + private final NodeContext nodeContext; /** @@ -344,10 +343,10 @@ public class DAGAppMaster extends AbstractService { private TezDAGHook[] hooks = {}; public DAGAppMaster(ApplicationAttemptId applicationAttemptId, - ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, + ContainerId containerId, Clock clock, long appSubmitTime, boolean isSession, String workingDirectory, String [] localDirs, String[] logDirs, String clientVersion, - Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { + Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) { super(DAGAppMaster.class.getName()); this.mdcContext = LoggingUtils.setupLog4j(); this.clock = clock; @@ -355,9 +354,7 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, this.appSubmitTime = appSubmitTime; this.appAttemptID = applicationAttemptId; this.containerID = containerId; - this.nmHost = nmHost; - this.nmPort = nmPort; - this.nmHttpPort = nmHttpPort; + this.nodeContext = nodeContext; this.state = DAGAppMasterState.NEW; this.isSession = isSession; this.workingDirectory = workingDirectory; @@ -371,9 +368,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, .createRemoteUser(jobUserName); this.appMasterUgi.addCredentials(amCredentials); - this.containerLogs = getRunningLogURL(this.nmHost + ":" + this.nmHttpPort, - this.containerID.toString(), this.appMasterUgi.getShortUserName()); - LOG.info("Created DAGAppMaster for application " + applicationAttemptId + ", versionInfo=" + dagVersionInfo); TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am"); @@ -443,6 +437,16 @@ protected void serviceInit(final Configuration conf) throws Exception { this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); + if (!isLocal) { + this.nmHost = nodeContext.getNodeHostString(); + int nmHttpPort = Integer.parseInt(nodeContext.getNodeHttpPortString()); + this.containerLogs = + getRunningLogURL( + this.nmHost + ":" + nmHttpPort, + this.containerID.toString(), + this.appMasterUgi.getShortUserName()); + } + UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf); PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload); @@ -1207,15 +1211,15 @@ public ContainerId getAppContainerId() { } public String getAppNMHost() { - return nmHost; + return nodeContext.getNodeHostString(); } public int getAppNMPort() { - return nmPort; + return Integer.parseInt(nodeContext.getNodePortString()); } public int getAppNMHttpPort() { - return nmHttpPort; + return Integer.parseInt(nodeContext.getNodeHttpPortString()); } public int getRpcPort() { @@ -2415,13 +2419,15 @@ public static void main(String[] args) { // Install the tez class loader, which can be used add new resources TezClassLoader.setupTezClassLoader(); Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); - final String pid = System.getenv().get("JVM_PID"); - String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name()); - String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name()); - String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name()); + final long pid = ProcessHandle.current().pid(); String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV); + String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name()); + String pwd = System.getenv(ApplicationConstants.Environment.PWD.name()); + String localDirs = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()); + String logDirs = System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()); + if (clientVersion == null) { clientVersion = VersionInfo.UNKNOWN; } @@ -2435,6 +2441,7 @@ public static void main(String[] args) { DAGProtos.ConfigurationProto confProto = amExtensions.loadConfigurationProto(); TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList()); + NodeContext nodeContext = new YarnNodeManagerContext(); ContainerId containerId = amExtensions.allocateContainerId(conf); ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); @@ -2442,7 +2449,6 @@ public static void main(String[] args) { .Builder("tez_appmaster_" + containerId.getApplicationAttemptId() ).build()); long appSubmitTime = Long.parseLong(appSubmitTimeStr); - String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name()); // Command line options Option option = Option.builder() @@ -2462,9 +2468,9 @@ public static void main(String[] args) { + ", jvmPid=" + pid + ", userFromEnv=" + jobUserName + ", cliSessionOption=" + sessionModeCliOption - + ", pwd=" + System.getenv(ApplicationConstants.Environment.PWD.name()) - + ", localDirs=" + System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()) - + ", logDirs=" + System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())); + + ", pwd=" + pwd + + ", localDirs=" + localDirs + + ", logDirs=" + logDirs); AMPluginDescriptorProto amPluginDescriptorProto = null; if (confProto.hasAmPluginDescriptor()) { @@ -2477,20 +2483,26 @@ public static void main(String[] args) { TezUtilsInternal.setSecurityUtilConfigration(LOG, conf); DAGAppMaster appMaster = - new DAGAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), - Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime, sessionModeCliOption, - System.getenv(ApplicationConstants.Environment.PWD.name()), - TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())), - TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())), - clientVersion, credentials, jobUserName, amPluginDescriptorProto); + new DAGAppMaster( + applicationAttemptId, + containerId, + new SystemClock(), + appSubmitTime, + sessionModeCliOption, + pwd, + TezCommonUtils.getTrimmedStrings(localDirs), + TezCommonUtils.getTrimmedStrings(logDirs), + clientVersion, + credentials, + jobUserName, + amPluginDescriptorProto, + nodeContext); ShutdownHookManager.get().addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); // log the system properties if (LOG.isInfoEnabled()) { String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf); - if (systemPropsToLog != null) { - LOG.info(systemPropsToLog); - } + LOG.info(systemPropsToLog); } initAndStartAppMaster(appMaster, conf); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java index e0c8443577..67c78aecbb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java @@ -32,12 +32,12 @@ public class LocalDAGAppMaster extends DAGAppMaster { public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, - String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession, + Clock clock, long appSubmitTime, boolean isSession, String workingDirectory, String[] localDirs, String[] logDirs, String clientVersion, - Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { - super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, + Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) { + super(applicationAttemptId, containerId, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, clientVersion, credentials, jobUserName, - pluginDescriptorProto); + pluginDescriptorProto, nodeContext); } @Override diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalNodeContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalNodeContext.java new file mode 100644 index 0000000000..414e54953e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalNodeContext.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *
http://www.apache.org/licenses/LICENSE-2.0 + * + *
Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.app; + +/** Local implementation of NodeContext. */ +public final class LocalNodeContext implements NodeContext { + + private final String nodeHostString; + private final String nodePortString; + private final String nodeHttpPortString; + + public LocalNodeContext(String nodeHostString, int nodePortString, int nmHttpPort) { + this.nodeHostString = nodeHostString; + this.nodePortString = String.valueOf(nodePortString); + this.nodeHttpPortString = String.valueOf(nmHttpPort); + } + + @Override + public String getNodeHostString() { + return nodeHostString; + } + + @Override + public String getNodePortString() { + return nodePortString; + } + + @Override + public String getNodeHttpPortString() { + return nodeHttpPortString; + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/NodeContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/NodeContext.java new file mode 100644 index 0000000000..b96073f37e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/NodeContext.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *
http://www.apache.org/licenses/LICENSE-2.0 + * + *
Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.app; + +/** Provides context information about the node on which the DAGAppMaster is running. */ +public sealed interface NodeContext permits YarnNodeManagerContext, LocalNodeContext { + + /** + * @return The node host string + */ + String getNodeHostString(); + + /** + * @return The node port string + */ + String getNodePortString(); + + /** + * @return The node HTTP port string + */ + String getNodeHttpPortString(); +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/YarnNodeManagerContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/YarnNodeManagerContext.java new file mode 100644 index 0000000000..b703b4d681 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/YarnNodeManagerContext.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *
http://www.apache.org/licenses/LICENSE-2.0 + * + *
Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app;
+
+import java.util.function.Supplier;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+
+/**
+ * YARN specific implementation of NodeContext. Uses Suppliers to lazily resolve YARN environment
+ * variables only when they are first requested, avoiding eager resolution during DAGAppMaster
+ * startup.
+ */
+public final class YarnNodeManagerContext implements NodeContext {
+
+ // Preserving original variable names conceptually inside the suppliers
+ private final Supplier