From 9c2b825d11d32fc92c79db667fbfba12929b3716 Mon Sep 17 00:00:00 2001 From: "hanhan.zhang" Date: Thu, 17 Oct 2024 15:02:54 +0800 Subject: [PATCH] option parameter support placeholder substitution --- .../plugin/task/flink/FlinkArgsUtils.java | 6 +++-- .../plugin/task/flink/FlinkArgsUtilsTest.java | 23 ++++++++++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index fe374c9d8fed..6f02b54c8a5a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -112,7 +112,8 @@ private static List buildRunCommandLineForSql(TaskExecutionContext taskE String others = flinkParameters.getOthers(); if (StringUtils.isNotEmpty(others)) { - args.add(others); + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); + args.add(ParameterUtils.convertParameterPlaceholders(others, ParameterUtils.convert(paramsMap))); } return args; } @@ -271,7 +272,8 @@ private static List buildRunCommandLineForOthers(TaskExecutionContext ta // -s -yqu -yat -yD -D if (StringUtils.isNotEmpty(others)) { - args.add(others); + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); + args.add(ParameterUtils.convertParameterPlaceholders(others, ParameterUtils.convert(paramsMap))); } // determine yarn queue diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index b53260bb87eb..e4ff2ec375b5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -18,10 +18,14 @@ package org.apache.dolphinscheduler.plugin.task.flink; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -45,6 +49,7 @@ private FlinkParameters buildTestFlinkParametersWithDeployMode(FlinkDeployMode f flinkParameters.setAppName("demo-app-name"); flinkParameters.setJobManagerMemory("1024m"); flinkParameters.setTaskManagerMemory("1024m"); + flinkParameters.setOthers("-Dyarn.application.name=${job-name}"); return flinkParameters; } @@ -60,6 +65,12 @@ private TaskExecutionContext buildTestTaskExecutionContext() { ResourceContext resourceContext = new ResourceContext(); resourceContext.addResourceItem(resourceItem); taskExecutionContext.setResourceContext(resourceContext); + + Map parameters = new HashMap<>(); + parameters.put("job-name", + Property.builder().type(DataType.VARCHAR).prop("job-name").value("demo-app-name").build()); + taskExecutionContext.setPrepareParamsMap(parameters); + return taskExecutionContext; } @@ -69,7 +80,7 @@ public void testRunJarInApplicationMode() throws Exception { List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -81,7 +92,7 @@ public void testRunJarInClusterMode() { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine1)); flinkParameters.setFlinkVersion("<1.10"); @@ -89,7 +100,7 @@ public void testRunJarInClusterMode() { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine2)); flinkParameters.setFlinkVersion(">=1.12"); @@ -97,7 +108,7 @@ public void testRunJarInClusterMode() { FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine3)); } @@ -107,7 +118,7 @@ public void testRunJarInLocalMode() { List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar", + "${FLINK_HOME}/bin/flink run -p 4 -sae -Dyarn.application.name=demo-app-name -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -118,7 +129,7 @@ public void testRunSql() { List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); Assertions.assertEquals( - "${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql", + "${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql -Dyarn.application.name=demo-app-name", joinStringListWithSpace(commandLine)); }