Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.LocalDAGAppMaster;
import org.apache.tez.dag.app.LocalNodeContext;
import org.apache.tez.dag.app.NodeContext;
import org.apache.tez.dag.app.dag.DAG;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -369,10 +371,11 @@ public void run() {
long appSubmitTime = System.currentTimeMillis();

dagAppMaster =
createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
createDAGAppMaster(applicationAttemptId, cId,
SystemClock.getInstance(), appSubmitTime, isSession, userDir.toUri().getPath(),
new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()},
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName(),
new LocalNodeContext(currentHost, nmPort, nmHttpPort));
DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
clientHandler = new DAGClientHandler(dagAppMaster);
((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop();
Expand All @@ -395,27 +398,32 @@ public void run() {

// this can be overridden by test code to create a mock app
@VisibleForTesting
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId cId, String currentHost, int nmPort,
int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession,
String userDir,
String[] localDirs, String[] logDirs,
Credentials credentials, String jobUserName) throws
IOException {
protected DAGAppMaster createDAGAppMaster(
ApplicationAttemptId applicationAttemptId,
ContainerId cId,
Clock clock,
long appSubmitTime,
boolean isSession,
String userDir,
String[] localDirs,
String[] logDirs,
Credentials credentials,
String jobUserName,
NodeContext nodeContext)
throws IOException {

// Read in additional information about external services
AMPluginDescriptorProto amPluginDescriptorProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
.getAmPluginDescriptor();

return isLocalWithoutNetwork
? new LocalDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
? new LocalDAGAppMaster(applicationAttemptId, cId,
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto)
: new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext)
: new DAGAppMaster(applicationAttemptId, cId,
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext);
}

@Override
Expand Down
80 changes: 46 additions & 34 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,16 @@
* The state machine is encapsulated in the implementation of Job interface.
* All state changes happens via Job interface. Each event
* results in a Finite State Transition in Job.
*
* <p>
* Tez DAG AppMaster is the composition of loosely coupled services. The services
* interact with each other via events. The components resembles the
* Actors model. The component acts on received event and send out the
* events to other components.
* This keeps it highly concurrent with no or minimal synchronization needs.
*
* <p>
* The events are dispatched by a central Dispatch mechanism. All components
* register to the Dispatcher.
*
* <p>
* The information is shared across different components using AppContext.
*/

Expand All @@ -245,9 +245,7 @@ public class DAGAppMaster extends AbstractService {
private String appName;
private final ApplicationAttemptId appAttemptID;
private final ContainerId containerID;
private final String nmHost;
private final int nmPort;
private final int nmHttpPort;
private String nmHost;
private final String workingDirectory;
private final String[] localDirs;
private final String[] logDirs;
Expand Down Expand Up @@ -309,6 +307,7 @@ public class DAGAppMaster extends AbstractService {

private ListeningExecutorService execService;
private final PluginManager pluginManager;
private final NodeContext nodeContext;


/**
Expand Down Expand Up @@ -344,20 +343,18 @@ public class DAGAppMaster extends AbstractService {
private TezDAGHook[] hooks = {};

public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
ContainerId containerId,
Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
String [] localDirs, String[] logDirs, String clientVersion,
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) {
super(DAGAppMaster.class.getName());
this.mdcContext = LoggingUtils.setupLog4j();
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
this.appAttemptID = applicationAttemptId;
this.containerID = containerId;
this.nmHost = nmHost;
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.nodeContext = nodeContext;
this.state = DAGAppMasterState.NEW;
this.isSession = isSession;
this.workingDirectory = workingDirectory;
Expand All @@ -371,9 +368,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
.createRemoteUser(jobUserName);
this.appMasterUgi.addCredentials(amCredentials);

this.containerLogs = getRunningLogURL(this.nmHost + ":" + this.nmHttpPort,
this.containerID.toString(), this.appMasterUgi.getShortUserName());

LOG.info("Created DAGAppMaster for application " + applicationAttemptId
+ ", versionInfo=" + dagVersionInfo);
TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am");
Expand Down Expand Up @@ -443,6 +437,16 @@ protected void serviceInit(final Configuration conf) throws Exception {
this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);

if (!isLocal) {
this.nmHost = nodeContext.getNodeHostString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is causing UT failues in TestMockDAGAppMaster.java because

report.setHost(dagAppMaster.getAppNMHost());
is null post this change i.e. moving nmHost outside constructor. Working on fix...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. tested on local.

int nmHttpPort = Integer.parseInt(nodeContext.getNodeHttpPortString());
this.containerLogs =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.containerLogs is making use of nmHttpPort and nmHost, moved it from constructor to serviceInit() and made nmHttpPort as local variable

getRunningLogURL(
this.nmHost + ":" + nmHttpPort,
this.containerID.toString(),
this.appMasterUgi.getShortUserName());
}

UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);

PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload);
Expand Down Expand Up @@ -1207,15 +1211,15 @@ public ContainerId getAppContainerId() {
}

public String getAppNMHost() {
return nmHost;
return nodeContext.getNodeHostString();
}

public int getAppNMPort() {
return nmPort;
return Integer.parseInt(nodeContext.getNodePortString());
}

public int getAppNMHttpPort() {
return nmHttpPort;
return Integer.parseInt(nodeContext.getNodeHttpPortString());
}

public int getRpcPort() {
Expand Down Expand Up @@ -2415,13 +2419,15 @@ public static void main(String[] args) {
// Install the tez class loader, which can be used add new resources
TezClassLoader.setupTezClassLoader();
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
final String pid = System.getenv().get("JVM_PID");

String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
final long pid = ProcessHandle.current().pid();
String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
String pwd = System.getenv(ApplicationConstants.Environment.PWD.name());
String localDirs = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name());
String logDirs = System.getenv(ApplicationConstants.Environment.LOG_DIRS.name());

if (clientVersion == null) {
clientVersion = VersionInfo.UNKNOWN;
}
Expand All @@ -2435,14 +2441,14 @@ public static void main(String[] args) {
DAGProtos.ConfigurationProto confProto = amExtensions.loadConfigurationProto();
TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());

NodeContext nodeContext = new YarnNodeManagerContext();
Copy link
Contributor Author

@Aggarwal-Raghav Aggarwal-Raghav Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to wrap in if statement for checking if framework is yarn. As the YarnNodeManagerContext is using supplier it won't be evaluated immediately and evaluation will happen only it !isLocal.
NOTE: in tez-am docker image tez.local.mode = true

ContainerId containerId = amExtensions.allocateContainerId(conf);

ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
org.apache.hadoop.ipc.CallerContext.setCurrent(new org.apache.hadoop.ipc.CallerContext
.Builder("tez_appmaster_" + containerId.getApplicationAttemptId()
).build());
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());

// Command line options
Option option = Option.builder()
Expand All @@ -2462,9 +2468,9 @@ public static void main(String[] args) {
+ ", jvmPid=" + pid
+ ", userFromEnv=" + jobUserName
+ ", cliSessionOption=" + sessionModeCliOption
+ ", pwd=" + System.getenv(ApplicationConstants.Environment.PWD.name())
+ ", localDirs=" + System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())
+ ", logDirs=" + System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
+ ", pwd=" + pwd
+ ", localDirs=" + localDirs
+ ", logDirs=" + logDirs);

AMPluginDescriptorProto amPluginDescriptorProto = null;
if (confProto.hasAmPluginDescriptor()) {
Expand All @@ -2477,20 +2483,26 @@ public static void main(String[] args) {
TezUtilsInternal.setSecurityUtilConfigration(LOG, conf);

DAGAppMaster appMaster =
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime, sessionModeCliOption,
System.getenv(ApplicationConstants.Environment.PWD.name()),
TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
clientVersion, credentials, jobUserName, amPluginDescriptorProto);
new DAGAppMaster(
applicationAttemptId,
containerId,
new SystemClock(),
appSubmitTime,
sessionModeCliOption,
pwd,
TezCommonUtils.getTrimmedStrings(localDirs),
TezCommonUtils.getTrimmedStrings(logDirs),
clientVersion,
credentials,
jobUserName,
amPluginDescriptorProto,
nodeContext);
ShutdownHookManager.get().addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);

// log the system properties
if (LOG.isInfoEnabled()) {
String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
LOG.info(systemPropsToLog);
}

initAndStartAppMaster(appMaster, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
public class LocalDAGAppMaster extends DAGAppMaster {

public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession,
Clock clock, long appSubmitTime, boolean isSession,
String workingDirectory, String[] localDirs, String[] logDirs, String clientVersion,
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) {
super(applicationAttemptId, containerId, clock, appSubmitTime,
isSession, workingDirectory, localDirs, logDirs, clientVersion, credentials, jobUserName,
pluginDescriptorProto);
pluginDescriptorProto, nodeContext);
}

@Override
Expand Down
44 changes: 44 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/LocalNodeContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;

/** Local implementation of NodeContext. */
public final class LocalNodeContext implements NodeContext {

private final String nodeHostString;
private final String nodePortString;
private final String nodeHttpPortString;

public LocalNodeContext(String nodeHostString, int nodePortString, int nmHttpPort) {
this.nodeHostString = nodeHostString;
this.nodePortString = String.valueOf(nodePortString);
this.nodeHttpPortString = String.valueOf(nmHttpPort);
}

@Override
public String getNodeHostString() {
return nodeHostString;
}

@Override
public String getNodePortString() {
return nodePortString;
}

@Override
public String getNodeHttpPortString() {
return nodeHttpPortString;
}
}
34 changes: 34 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/NodeContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;

/** Provides context information about the node on which the DAGAppMaster is running. */
public sealed interface NodeContext permits YarnNodeManagerContext, LocalNodeContext {

/**
* @return The node host string
*/
String getNodeHostString();

/**
* @return The node port string
*/
String getNodePortString();

/**
* @return The node HTTP port string
*/
String getNodeHttpPortString();
}
Loading