From fa521d81afa0e1ed4c584383d5db69fb3689d9a2 Mon Sep 17 00:00:00 2001 From: benjobs Date: Wed, 26 May 2021 19:10:01 +0800 Subject: [PATCH] [New Feature] checkPoint Failure Options (sent email or restart) --- README.md | 14 +- README_CN.md | 14 +- .../console/core/entity/Application.java | 32 +++- .../console/core/enums/AppExistsState.java | 4 +- .../console/core/enums/ApplicationType.java | 4 +- .../console/core/enums/CandidateType.java | 3 +- .../console/core/enums/ChangedType.java | 3 +- .../console/core/enums/CheckPointType.java | 5 +- .../console/core/enums/DeployState.java | 3 +- .../console/core/enums/FlinkAppState.java | 4 +- .../core/enums/GitAuthorizedError.java | 4 +- .../console/core/enums/OptionState.java | 3 +- .../core/metrics/flink/CheckPoints.java | 26 ++- .../console/core/service/AlertService.java | 6 +- .../core/service/ApplicationService.java | 2 + .../core/service/impl/AlertServiceImpl.java | 48 +++++- .../service/impl/ApplicationServiceImpl.java | 136 +++++++++------- .../console/core/task/FlinkTrackingTask.java | 96 +++++++---- .../mapper/core/ApplicationMapper.xml | 3 + .../streamx-console-webapp/package.json | 2 +- .../src/views/flink/app/Add.vue | 154 +++++++++++++----- .../src/views/flink/app/EditFlink.vue | 116 ++++++++++--- .../src/views/flink/app/EditStreamX.vue | 132 +++++++++++---- .../src/views/user/SignIn.vue | 4 - 24 files changed, 580 insertions(+), 238 deletions(-) diff --git a/README.md b/README.md index 0bfc4cf863..aaf4a92617 100644 --- a/README.md +++ b/README.md @@ -37,13 +37,12 @@ Make Flink|Spark easier The original intention of `StreamX` is to make the development of `Flink` easier. `StreamX` focuses on the management of development phases and tasks. Our ultimate goal is to build a one-stop big data solution integrating stream processing, batch processing, data warehouse and data laker. - +[![Watch the video](http://assets.streamxhub.com/streamx_player.png)](http://assets.streamxhub.com/streamx.mp4) ![](http://assets.streamxhub.com/streamx-main.png?12345) ![](http://assets.streamxhub.com/streamx-sql.png?12345) -![](http://assets.streamxhub.com/streamx-flameGraph.png?123456) ## 🎉 Features * Scaffolding @@ -104,18 +103,12 @@ Thanks to the above excellent open source projects and many outstanding open sou Thanks to the [fire-spark](https://github.com/GuoNingNing/fire-spark) project for the early inspiration and help. ### 🚀 Quick Start - -Prerequisites for building: - -* Maven 3.6+ -* npm 7.11.2 (https://nodejs.org/en/) -* JDK 1.8+ - ``` git clone https://github.com/streamxhub/streamx.git cd Streamx -mvn clean install -DskipTests +mvn clean install -DskipTests -Denv=prod ``` +click [Document](http://www.streamxhub.com/zh/doc/) for more information ## 👻 Why not...❓ @@ -140,7 +133,6 @@ You can contact us or ask questions via: - [New an issue](https://github.com/streamxhub/streamx/issues/new) - [Join us](#-Join-us) -- [Who uses Streamx](https://github.com/streamxhub/streamx/issues/163) ## 💰 Donation diff --git a/README_CN.md b/README_CN.md index bc7704ebf9..f359c14378 100644 --- a/README_CN.md +++ b/README_CN.md @@ -38,14 +38,13 @@ Make Flink|Spark easier!!! 我们在使用 `Flink` 时发现从编程模型, 启动配置到运维管理都有很多可以抽象共用的地方, 我们将一些好的经验固化下来并结合业内的最佳实践, 通过不断努力终于诞生了今天的框架 —— `StreamX`, 项目的初衷是 —— 让 `Flink` 开发更简单, 使用 `StreamX` 开发,可以极大降低学习成本和开发门槛, 让开发者只用关心最核心的业务, `StreamX` 规范了项目的配置,鼓励函数式编程,定义了最佳的编程方式,提供了一系列开箱即用的 `Connectors` ,标准化了配置、开发、测试、部署、监控、运维的整个过程, 提供 `Scala` 和 `Java` 两套api, 其最终目的是打造一个一站式大数据平台,流批一体,湖仓一体的解决方案 - + +[![Watch the video](http://assets.streamxhub.com/streamx_player.png)](http://assets.streamxhub.com/streamx.mp4) ![](http://assets.streamxhub.com/streamx-main.png?12345) ![](http://assets.streamxhub.com/streamx-sql.png?12345) -![](http://assets.streamxhub.com/streamx-flameGraph.png?123456) - ## 🎉 Features * 开发脚手架 * 一系列开箱即用的connectors @@ -103,6 +102,14 @@ Make Flink|Spark easier!!! 感谢以上优秀的开源项目和很多未提到的优秀开源项目,致以最崇高的敬意,特别感谢[Apache Zeppelin](http://zeppelin.apache.org),[IntelliJ IDEA](https://www.jetbrains.com/idea/), 感谢 [fire-spark](https://github.com/GuoNingNing/fire-spark) 项目,早期给予的灵感和帮助, 感谢我老婆在项目开发时给予的支持,悉心照顾我的生活和日常,给予我足够的时间开发这个项目 +### 🚀 快速上手 +``` +git clone https://github.com/streamxhub/streamx.git +cd Streamx +mvn clean install -DskipTests -Denv=prod +``` +更多请查看[官网文档](http://www.streamxhub.com/zh/doc/) + ## 👻 为什么不是...❓ ### Apache Zeppelin @@ -125,7 +132,6 @@ Make Flink|Spark easier!!! - [快速创建issue!](https://github.com/streamxhub/streamx/issues/new) - [加入社区](#-加入社区) -- [谁在使用streamx](https://github.com/streamxhub/streamx/issues/163) ## 💰 Donation diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/entity/Application.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/entity/Application.java index 78056fe47b..b824fe88e0 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/entity/Application.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/entity/Application.java @@ -138,6 +138,23 @@ public class Application implements Serializable { private Long duration; + /** + * checkpoint最大的失败次数 + */ + private Integer cpMaxFailureInterval; + + /** + * checkpoint在时间范围内失败(分钟) + */ + private Integer cpFailureRateInterval; + + /** + * 在X分钟之后失败Y次,之后触发的操作: + * 1: 发送告警 + * 2: 重启 + */ + private Integer cpFailureAction; + /** * overview */ @@ -202,6 +219,11 @@ public void setState(Integer state) { } } + @JsonIgnore + public boolean cpFailedTrigger() { + return this.cpMaxFailureInterval != null && this.cpFailureRateInterval != null && this.cpFailureAction != null; + } + @JsonIgnore public boolean eqFlinkJob(Application other) { if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) { @@ -356,7 +378,7 @@ public boolean eqJobParam(Application other) { //6) Dynamic Option 是否发生变化 //7) Program Args 是否发生变化 if (!this.getResolveOrder().equals(other.getResolveOrder()) || - !this.getExecutionMode().equals(other.getExecutionMode())) { + !this.getExecutionMode().equals(other.getExecutionMode())) { return false; } @@ -467,10 +489,10 @@ public static class Pom { @Override public String toString() { return "{" + - "groupId='" + groupId + '\'' + - ", artifactId='" + artifactId + '\'' + - ", version='" + version + '\'' + - '}'; + "groupId='" + groupId + '\'' + + ", artifactId='" + artifactId + '\'' + + ", version='" + version + '\'' + + '}'; } private String getGav() { diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/AppExistsState.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/AppExistsState.java index 2a31ab09d8..8763297990 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/AppExistsState.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/AppExistsState.java @@ -20,10 +20,12 @@ */ package com.streamxhub.streamx.console.core.enums; +import java.io.Serializable; + /** * @author benjobs */ -public enum AppExistsState { +public enum AppExistsState implements Serializable { /** * 不存在 diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/ApplicationType.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/ApplicationType.java index 38cdb37edf..7bb4944e0c 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/ApplicationType.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/ApplicationType.java @@ -20,10 +20,12 @@ */ package com.streamxhub.streamx.console.core.enums; +import java.io.Serializable; + /** * @author benjobs */ -public enum ApplicationType { +public enum ApplicationType implements Serializable { /** * StreamX Flink */ diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/CandidateType.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/CandidateType.java index 08bb7be12b..4c009b8a9b 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/CandidateType.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/CandidateType.java @@ -20,9 +20,10 @@ */ package com.streamxhub.streamx.console.core.enums; +import java.io.Serializable; import java.util.Arrays; -public enum CandidateType { +public enum CandidateType implements Serializable { /** * 非候选版本 diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/ChangedType.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/ChangedType.java index 92610f7dd0..619a96e310 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/ChangedType.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/ChangedType.java @@ -20,13 +20,14 @@ */ package com.streamxhub.streamx.console.core.enums; +import java.io.Serializable; import java.util.Arrays; /** * @author benjobs */ -public enum ChangedType { +public enum ChangedType implements Serializable { /** * 未发生变化 */ diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/CheckPointType.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/CheckPointType.java index b5eb39f7e3..cdac841dde 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/CheckPointType.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/CheckPointType.java @@ -34,7 +34,10 @@ public enum CheckPointType implements Serializable { /** * SAVEPOINT */ - SAVEPOINT(2); + SAVEPOINT(2), + + SYNC_SAVEPOINT(3); + int value; public int get() { diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/DeployState.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/DeployState.java index 6511224a2f..e418a6c239 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/DeployState.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/DeployState.java @@ -20,12 +20,13 @@ */ package com.streamxhub.streamx.console.core.enums; +import java.io.Serializable; import java.util.Arrays; /** * @author benjobs */ -public enum DeployState { +public enum DeployState implements Serializable { /** * 需用重新发布,但是下载maven依赖失败.(针对flinkSql任务) diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/FlinkAppState.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/FlinkAppState.java index 3f47188912..ab065395d7 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/FlinkAppState.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/FlinkAppState.java @@ -22,11 +22,13 @@ import lombok.Getter; +import java.io.Serializable; + /** * @author benjobs */ @Getter -public enum FlinkAppState { +public enum FlinkAppState implements Serializable { /** * added new job to database diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/GitAuthorizedError.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/GitAuthorizedError.java index 7a2337c673..cfdc02c4b5 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/GitAuthorizedError.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/GitAuthorizedError.java @@ -20,10 +20,12 @@ */ package com.streamxhub.streamx.console.core.enums; +import java.io.Serializable; + /** * @author benjobs */ -public enum GitAuthorizedError { +public enum GitAuthorizedError implements Serializable { /** * 没有错误 diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/OptionState.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/OptionState.java index a85d493e62..13bcd5ef02 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/OptionState.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/enums/OptionState.java @@ -22,13 +22,14 @@ import lombok.Getter; +import java.io.Serializable; import java.util.Arrays; /** * @author benjobs */ @Getter -public enum OptionState { +public enum OptionState implements Serializable { /** * Application which is currently action: none. diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/metrics/flink/CheckPoints.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/metrics/flink/CheckPoints.java index 4aa7534107..d945ef8eeb 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/metrics/flink/CheckPoints.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/metrics/flink/CheckPoints.java @@ -21,6 +21,7 @@ package com.streamxhub.streamx.console.core.metrics.flink; import com.fasterxml.jackson.annotation.JsonProperty; +import com.streamxhub.streamx.console.core.enums.CheckPointStatus; import com.streamxhub.streamx.console.core.enums.CheckPointType; import lombok.Data; @@ -33,6 +34,8 @@ @Data public class CheckPoints implements Serializable { + private Counts counts; + private List history; private Latest latest; @@ -65,16 +68,17 @@ public static class CheckPoint implements Serializable { private Boolean discarded; - public boolean isCompleted() { - return "COMPLETED".equals(this.status); + public CheckPointStatus getCheckPointStatus() { + return CheckPointStatus.valueOf(this.status); } - public CheckPointType getCheckPointType() { if ("CHECKPOINT".equals(this.checkpointType)) { return CheckPointType.CHECKPOINT; + } else if ("SAVEPOINT".equals(this.checkpointType)) { + return CheckPointType.SAVEPOINT; } - return CheckPointType.SAVEPOINT; + return CheckPointType.SYNC_SAVEPOINT; } public String getPath() { @@ -86,4 +90,18 @@ public String getPath() { public static class Latest implements Serializable { private CheckPoint completed; } + + @Data + public static class Counts implements Serializable { + private Integer completed; + + private Integer failed; + + @JsonProperty("in_progress") + private Integer inProgress; + + private Integer restored; + + private Integer total; + } } diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/AlertService.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/AlertService.java index 931ce8569f..09f4ff8d13 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/AlertService.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/AlertService.java @@ -21,7 +21,8 @@ package com.streamxhub.streamx.console.core.service; import com.streamxhub.streamx.console.core.entity.Application; -import com.streamxhub.streamx.console.core.enums.FlinkAppState; + +import java.io.Serializable; /** * @author benjobs @@ -33,6 +34,7 @@ public interface AlertService { * * @param application */ - void alert(Application application, FlinkAppState appState); + void alert(Application application, Serializable state); + } diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/ApplicationService.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/ApplicationService.java index ac3ce8607e..4b21c55c0e 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/ApplicationService.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/ApplicationService.java @@ -80,4 +80,6 @@ public interface ApplicationService extends IService { void revoke(Application app) throws Exception; Boolean delete(Application app); + + void restart(Application application) throws Exception; } diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/AlertServiceImpl.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/AlertServiceImpl.java index e16377c3ca..6d0c7b941e 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/AlertServiceImpl.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/AlertServiceImpl.java @@ -25,6 +25,7 @@ import com.streamxhub.streamx.common.util.Utils; import com.streamxhub.streamx.console.core.entity.Application; import com.streamxhub.streamx.console.core.entity.SenderEmail; +import com.streamxhub.streamx.console.core.enums.CheckPointStatus; import com.streamxhub.streamx.console.core.enums.FlinkAppState; import com.streamxhub.streamx.console.core.service.AlertService; import com.streamxhub.streamx.console.core.service.SettingService; @@ -37,6 +38,7 @@ import javax.annotation.PostConstruct; import java.io.File; +import java.io.Serializable; import java.io.StringWriter; import java.net.URL; import java.util.*; @@ -84,8 +86,14 @@ public void initConfig() throws Exception { } @Override - public void alert(Application application, FlinkAppState appState) { - log.info("Email Alert:{} is {}", application.getJobName(), appState.name()); + public void alert(Application application, Serializable state) { + String subject = "StreamX Alert: {}, %s"; + if (state instanceof FlinkAppState) { + subject = String.format(subject, application.getJobName(), "checkPoint is Failed"); + } else { + subject = String.format(subject, application.getJobName(), " is ".concat(((CheckPointStatus) state).name())); + } + log.info(subject); if (this.senderEmail == null) { this.senderEmail = settingService.getSenderEmail(); } @@ -102,8 +110,13 @@ public void alert(Application application, FlinkAppState appState) { htmlEmail.setSSLOnConnect(true); htmlEmail.setSslSmtpPort(this.senderEmail.getSmtpPort().toString()); } - htmlEmail.setSubject("StreamX Alert: [ " + application.getJobName() + " ] is " + appState.name()); - String html = getHtmlMessage(application, appState); + htmlEmail.setSubject(subject); + String html; + if (state instanceof FlinkAppState) { + html = getHtmlMessage(application, (FlinkAppState) state); + } else { + html = getHtmlMessage(application, (CheckPointStatus) state); + } htmlEmail.setHtmlMsg(html); htmlEmail.addTo(application.getAlertEmail().split(",")); htmlEmail.send(); @@ -146,4 +159,31 @@ private String getHtmlMessage(Application application, FlinkAppState appState) t return writer.toString(); } + private String getHtmlMessage(Application application, CheckPointStatus appState) throws Exception { + long duration; + if (application.getEndTime() == null) { + duration = System.currentTimeMillis() - application.getStartTime().getTime(); + } else { + duration = application.getEndTime().getTime() - application.getStartTime().getTime(); + } + duration = duration / 1000 / 60; + String content = "Job [" + application.getJobName() + "] checkPoint is " + appState.name() + "
" + + "Start Time: " + DateUtils.format(application.getStartTime(), DateUtils.fullFormat(), TimeZone.getDefault()) + "
" + + "End Time: " + DateUtils.format(application.getEndTime() == null ? new Date() : application.getEndTime(), DateUtils.fullFormat(), TimeZone.getDefault()) + "
" + + "Duration: " + DateUtils.toRichTimeDuration(duration) + "
"; + + content += "please check it,Thank you for using StreamX

Best Wishes!!"; + + Map root = new HashMap<>(3); + root.put("title", "Notify :" + application.getJobName().concat(" checkpoint is ").concat(appState.name())); + root.put("message", content); + String format = "%s/proxy/%s/"; + String url = String.format(format, HadoopUtils.getRMWebAppURL(false), application.getAppId()); + root.put("link", url); + + StringWriter writer = new StringWriter(); + template.process(root, writer); + return writer.toString(); + } + } diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/ApplicationServiceImpl.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/ApplicationServiceImpl.java index 443344e91d..d1cb89f59c 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/ApplicationServiceImpl.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/ApplicationServiceImpl.java @@ -86,7 +86,7 @@ @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class) @DependsOn({"flyway", "flywayInitializer"}) public class ApplicationServiceImpl extends ServiceImpl - implements ApplicationService { + implements ApplicationService { @Autowired private ProjectService projectService; @@ -134,13 +134,13 @@ public class ApplicationServiceImpl extends ServiceImpl(1024), - ThreadUtils.threadFactory("streamx-deploy-executor"), - new ThreadPoolExecutor.AbortPolicy() + Runtime.getRuntime().availableProcessors() * 2, + 200, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1024), + ThreadUtils.threadFactory("streamx-deploy-executor"), + new ThreadPoolExecutor.AbortPolicy() ); @PostConstruct @@ -333,6 +333,12 @@ public Boolean delete(Application app) { } } + @Override + public void restart(Application application) throws Exception { + this.cancel(application); + this.start(application, false); + } + private void removeApp(Long appId) { removeById(appId); HdfsUtils.delete(ConfigConst.APP_WORKSPACE().concat("/").concat(appId.toString())); @@ -374,8 +380,8 @@ public String getYarnName(Application appParam) { @Override public AppExistsState checkExists(Application appParam) { boolean inDB = this.baseMapper.selectCount( - new QueryWrapper().lambda() - .eq(Application::getJobName, appParam.getJobName())) > 0; + new QueryWrapper().lambda() + .eq(Application::getJobName, appParam.getJobName())) > 0; if (appParam.getId() != null) { Application app = getById(appParam.getId()); @@ -390,12 +396,12 @@ public AppExistsState checkExists(Application appParam) { FlinkAppState state = FlinkAppState.of(app.getState()); //当前任务已停止的状态 if (state.equals(FlinkAppState.ADDED) || - state.equals(FlinkAppState.DEPLOYED) || - state.equals(FlinkAppState.CREATED) || - state.equals(FlinkAppState.FAILED) || - state.equals(FlinkAppState.CANCELED) || - state.equals(FlinkAppState.LOST) || - state.equals(FlinkAppState.KILLED)) { + state.equals(FlinkAppState.DEPLOYED) || + state.equals(FlinkAppState.CREATED) || + state.equals(FlinkAppState.FAILED) || + state.equals(FlinkAppState.CANCELED) || + state.equals(FlinkAppState.LOST) || + state.equals(FlinkAppState.KILLED)) { if (YarnUtils.isContains(appParam.getJobName())) { return AppExistsState.IN_YARN; } @@ -471,6 +477,10 @@ public boolean update(Application appParam) { application.setDescription(appParam.getDescription()); application.setAlertEmail(appParam.getAlertEmail()); application.setRestartSize(appParam.getRestartSize()); + application.setCpFailureAction(appParam.getCpFailureAction()); + application.setCpFailureRateInterval(appParam.getCpFailureRateInterval()); + application.setCpMaxFailureInterval(appParam.getCpMaxFailureInterval()); + // Flink Sql job... if (application.isFlinkSqlJob()) { updateFlinkSqlJob(application, appParam); @@ -564,11 +574,11 @@ public void deploy(Application application) { } FlinkTrackingTask.refreshTracking(application.getId(), () -> { baseMapper.update( - application, - new UpdateWrapper() - .lambda() - .eq(Application::getId, application.getId()) - .set(Application::getDeploy, DeployState.DEPLOYING.get()) + application, + new UpdateWrapper() + .lambda() + .eq(Application::getId, application.getId()) + .set(Application::getDeploy, DeployState.DEPLOYING.get()) ); return null; @@ -674,9 +684,9 @@ private void downloadDependency(Application application) throws Exception { */ builder.setLength(0); builder.append("org.apache.flink:force-shading,") - .append("com.google.code.findbugs:jsr305,") - .append("org.slf4j:*,") - .append("org.apache.logging.log4j:*,"); + .append("com.google.code.findbugs:jsr305,") + .append("org.slf4j:*,") + .append("org.apache.logging.log4j:*,"); /* * 用户指定需要排除的依赖. */ @@ -693,23 +703,23 @@ private void downloadDependency(Application application) throws Exception { Collection dependencyJars; try { dependencyJars = JavaConversions.asJavaCollection(DependencyUtils.resolveMavenDependencies( - exclusions, - packages, - null, - null, - null, - out -> { - if (tailOutMap.containsKey(id)) { - if (tailBeginning.containsKey(id)) { - tailBeginning.remove(id); - Arrays.stream(logBuilder.toString().split("\n")) - .forEach(x -> simpMessageSendingOperations.convertAndSend("/resp/mvn", x)); - } else { - simpMessageSendingOperations.convertAndSend("/resp/mvn", out); + exclusions, + packages, + null, + null, + null, + out -> { + if (tailOutMap.containsKey(id)) { + if (tailBeginning.containsKey(id)) { + tailBeginning.remove(id); + Arrays.stream(logBuilder.toString().split("\n")) + .forEach(x -> simpMessageSendingOperations.convertAndSend("/resp/mvn", x)); + } else { + simpMessageSendingOperations.convertAndSend("/resp/mvn", out); + } } + logBuilder.append(out).append("\n"); } - logBuilder.append(out).append("\n"); - } )); } catch (Exception e) { simpMessageSendingOperations.convertAndSend("/resp/mvn", e.getMessage()); @@ -820,12 +830,12 @@ public void cancel(Application appParam) { executorService.submit(() -> { try { String savePointDir = FlinkSubmit.stop( - settingService.getEffectiveFlinkHome(), - ExecutionMode.of(application.getExecutionMode()), - application.getAppId(), - application.getJobId(), - appParam.getSavePointed(), - appParam.getDrain() + settingService.getEffectiveFlinkHome(), + ExecutionMode.of(application.getExecutionMode()), + application.getAppId(), + application.getJobId(), + appParam.getSavePointed(), + appParam.getDrain() ); if (savePointDir != null) { log.info("savePoint path:{}", savePointDir); @@ -994,8 +1004,8 @@ public boolean start(Application appParam, boolean auto) throws Exception { } String[] dynamicOption = CommonUtil.notEmpty(application.getDynamicOptions()) - ? application.getDynamicOptions().split("\\s+") - : new String[0]; + ? application.getDynamicOptions().split("\\s+") + : new String[0]; Map flameGraph = null; if (appParam.getFlameGraph()) { @@ -1020,22 +1030,22 @@ public boolean start(Application appParam, boolean auto) throws Exception { SubmitRequest submitInfo = new SubmitRequest( - settingService.getEffectiveFlinkHome(), - settingService.getFlinkVersion(), - settingService.getFlinkYaml(), - flinkUserJar, - DevelopmentMode.of(application.getJobType()), - ExecutionMode.of(application.getExecutionMode()), - resolveOrder, - application.getJobName(), - appConf, - application.getApplicationType().getName(), - savePointDir, - flameGraph, - option.toString(), - optionMap, - dynamicOption, - application.getArgs() + settingService.getEffectiveFlinkHome(), + settingService.getFlinkVersion(), + settingService.getFlinkYaml(), + flinkUserJar, + DevelopmentMode.of(application.getJobType()), + ExecutionMode.of(application.getExecutionMode()), + resolveOrder, + application.getJobName(), + appConf, + application.getApplicationType().getName(), + savePointDir, + flameGraph, + option.toString(), + optionMap, + dynamicOption, + application.getArgs() ); ApplicationLog log = new ApplicationLog(); diff --git a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/task/FlinkTrackingTask.java b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/task/FlinkTrackingTask.java index 55d33fe46c..4ec2cba9d1 100644 --- a/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/task/FlinkTrackingTask.java +++ b/streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/task/FlinkTrackingTask.java @@ -26,10 +26,7 @@ import com.streamxhub.streamx.common.util.ThreadUtils; import com.streamxhub.streamx.console.core.entity.Application; import com.streamxhub.streamx.console.core.entity.SavePoint; -import com.streamxhub.streamx.console.core.enums.DeployState; -import com.streamxhub.streamx.console.core.enums.FlinkAppState; -import com.streamxhub.streamx.console.core.enums.OptionState; -import com.streamxhub.streamx.console.core.enums.StopFrom; +import com.streamxhub.streamx.console.core.enums.*; import com.streamxhub.streamx.console.core.metrics.flink.CheckPoints; import com.streamxhub.streamx.console.core.metrics.flink.JobsOverview; import com.streamxhub.streamx.console.core.metrics.flink.Overview; @@ -37,6 +34,7 @@ import com.streamxhub.streamx.console.core.service.AlertService; import com.streamxhub.streamx.console.core.service.ApplicationService; import com.streamxhub.streamx.console.core.service.SavePointService; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; @@ -120,19 +118,12 @@ public class FlinkTrackingTask { private static final Map CHECK_POINT_MAP = new ConcurrentHashMap<>(); + private static final Map CHECK_POINT_FAILED_MAP = new ConcurrentHashMap<>(); + private static final Map OPTIONING = new ConcurrentHashMap<>(); - /** - * 10秒之内 - */ - private final Long OPTION_INTERVAL = 1000L * 10; private final Long STARTING_INTERVAL = 1000L * 30; - /** - * 正常5秒钟获取一次信息 - */ - private final Long TRACK_INTERVAL = 1000L * 5; - private Long lastTrackTime = 0L; private Long lastOptionTime = 0L; @@ -172,13 +163,18 @@ public void ending() { */ @Scheduled(fixedDelay = 1000) public void execute() { + // 正常5秒钟获取一次信息 + long track_interval = 1000L * 5; + //10秒之内 + long option_interval = 1000L * 10; + //1) 项目刚启动第一次执行,或者前端正在操作...(启动,停止)需要立即返回状态信息. if (lastTrackTime == null || !OPTIONING.isEmpty()) { tracking(); - } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL) { + } else if (System.currentTimeMillis() - lastOptionTime <= option_interval) { //2) 如果在管理端正在操作时间的10秒中之内(每秒执行一次) tracking(); - } else if (System.currentTimeMillis() - lastTrackTime >= TRACK_INTERVAL) { + } else if (System.currentTimeMillis() - lastTrackTime >= track_interval) { //3) 正常信息获取,判断本次时间和上次时间是否间隔5秒(正常监控信息获取,每5秒一次) tracking(); } @@ -326,24 +322,46 @@ private void handleJobOverview(Application application, JobsOverview.Job jobOver * @param application * @throws IOException */ - private void handleCheckPoints(Application application) throws IOException { + private void handleCheckPoints(Application application) throws Exception { CheckPoints checkPoints = application.httpCheckpoints(); if (checkPoints != null) { CheckPoints.Latest latest = checkPoints.getLatest(); - if (latest != null ) { + if (latest != null) { CheckPoints.CheckPoint checkPoint = latest.getCompleted(); - if (checkPoint != null && checkPoint.isCompleted()) { - Long latestId = CHECK_POINT_MAP.get(application.getJobId()); - if (latestId == null || latestId < checkPoint.getId()) { - SavePoint savePoint = new SavePoint(); - savePoint.setAppId(application.getId()); - savePoint.setLatest(true); - savePoint.setType(checkPoint.getCheckPointType().get()); - savePoint.setPath(checkPoint.getPath()); - savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp())); - savePoint.setCreateTime(new Date()); - savePointService.save(savePoint); - CHECK_POINT_MAP.put(application.getJobId(), checkPoint.getId()); + if (checkPoint != null) { + CheckPointStatus status = checkPoint.getCheckPointStatus(); + if (CheckPointStatus.COMPLETED.equals(status)) { + Long latestId = CHECK_POINT_MAP.get(application.getJobId()); + if (latestId == null || latestId < checkPoint.getId()) { + SavePoint savePoint = new SavePoint(); + savePoint.setAppId(application.getId()); + savePoint.setLatest(true); + savePoint.setType(checkPoint.getCheckPointType().get()); + savePoint.setPath(checkPoint.getPath()); + savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp())); + savePoint.setCreateTime(new Date()); + savePointService.save(savePoint); + CHECK_POINT_MAP.put(application.getJobId(), checkPoint.getId()); + } + } else if (CheckPointStatus.FAILED.equals(status) && application.cpFailedTrigger()) { + Counter counter = CHECK_POINT_FAILED_MAP.get(application.getJobId()); + if (counter == null) { + CHECK_POINT_FAILED_MAP.put(application.getJobId(), new Counter(checkPoint.getTriggerTimestamp())); + } else { + //x分钟之内超过Y次CheckPoint失败触发动作 + long minute = counter.getDuration(checkPoint.getTriggerTimestamp()); + if (minute <= application.getCpFailureRateInterval() + && counter.getCount() >= application.getCpMaxFailureInterval()) { + CHECK_POINT_FAILED_MAP.remove(application.getJobId()); + if (application.getCpFailureAction() == 1) { + alertService.alert(application, CheckPointStatus.FAILED); + } else { + applicationService.restart(application); + } + } else { + counter.add(); + } + } } } } @@ -627,4 +645,24 @@ public static Map getAllTrackingApp() { public static Application getTracking(Long appId) { return TRACKING_MAP.get(appId); } + + @Data + public static class Counter { + private Long timestamp; + private Integer count; + + public Counter(Long timestamp) { + this.timestamp = timestamp; + this.count = 1; + } + + public void add() { + this.count += 1; + } + + public long getDuration(Long currentTimestamp) { + return (currentTimestamp - this.getTimestamp()) / 1000 / 60; + } + } + } diff --git a/streamx-console/streamx-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streamx-console/streamx-console-service/src/main/resources/mapper/core/ApplicationMapper.xml index fdeed50206..37213e8e85 100644 --- a/streamx-console/streamx-console-service/src/main/resources/mapper/core/ApplicationMapper.xml +++ b/streamx-console/streamx-console-service/src/main/resources/mapper/core/ApplicationMapper.xml @@ -22,6 +22,9 @@ + + + diff --git a/streamx-console/streamx-console-webapp/package.json b/streamx-console/streamx-console-webapp/package.json index de8c1d90bc..4776c2ccee 100644 --- a/streamx-console/streamx-console-webapp/package.json +++ b/streamx-console/streamx-console-webapp/package.json @@ -15,7 +15,7 @@ }, "dependencies": { "@antv/data-set": "^0.10.1", - "ant-design-vue": "^1.7.3", + "ant-design-vue": "^1.7.5", "axios": "^0.21.1", "core-js": "^3.8.3", "dedent": "^0.7.0", diff --git a/streamx-console/streamx-console-webapp/src/views/flink/app/Add.vue b/streamx-console/streamx-console-webapp/src/views/flink/app/Add.vue index dee34d7572..d7be508607 100644 --- a/streamx-console/streamx-console-webapp/src/views/flink/app/Add.vue +++ b/streamx-console/streamx-console-webapp/src/views/flink/app/Add.vue @@ -12,7 +12,7 @@ :wrapper-col="{lg: {span: 16}, sm: {span: 17} }"> @@ -166,7 +166,7 @@ option-filter-prop="children" :filter-option="filterOption" placeholder="Please select Project" - @change="handleProject" + @change="handleChangeProject" v-decorator="[ 'project', {rules: [{ required: true }]} ]"> @@ -224,7 +224,7 @@ :wrapper-col="{lg: {span: 16}, sm: {span: 17} }"> @@ -279,6 +280,7 @@ @@ -358,7 +360,7 @@ @@ -417,6 +420,7 @@ @@ -428,30 +432,57 @@ - - - {{ conf.opt }} ( {{ conf.name }} ) - - + + + + minute + + + + count + + + + {{ o.name }} + + + + +

+ + Note + Operation after checkpoint failure, e.g:
+ Within 5 minutes(checkpoint failure rate interval), if the number of checkpoint failures reaches 10 (max failures per interval),action will be triggered(alert or restart job) +
+

@@ -522,7 +554,7 @@
-

+

Note Explicitly configuring both total process memory and total Flink memory is not recommended. It may lead to deployment failures due to potential memory configuration conflicts. Configuring other memory components also requires caution as it can produce further configuration conflicts, @@ -566,7 +598,7 @@ mode="multiple" :max-tag-count="controller.tagCount.jm" placeholder="Please select the resource parameters to set" - @change="handleJmMemory" + @change="handleChangeJmMemory" v-decorator="['jmOptions']"> - - - {{ conf.opt }} ( {{ conf.name }} ) - - + + + + minute + + + + count + + + + {{ o.name }} + + + + +

+ + Note + Operation after checkpoint failure, e.g:
+ Within 5 minutes(checkpoint failure rate interval), if the number of checkpoint failures reaches 10 (max failures per interval),action will be triggered(alert or restart job) +
+

@@ -279,7 +305,7 @@ mode="multiple" :max-tag-count="jmMaxTagCount" placeholder="Please select the resource parameters to set" - @change="handleJmMemory" + @change="handleChangeJmMemory" v-decorator="['jmOptions']"> { this.form.setFieldsValue({ 'jobName': this.app.jobName, @@ -657,7 +718,10 @@ export default { 'resolveOrder': this.app.resolveOrder, 'executionMode': this.app.executionMode, 'restartSize': this.app.restartSize, - 'alertEmail': this.app.alertEmail + 'alertEmail': this.app.alertEmail, + 'cpMaxFailureInterval': this.app.cpMaxFailureInterval, + 'cpFailureRateInterval': this.app.cpFailureRateInterval, + 'cpFailureAction': this.app.cpFailureAction }) }) diff --git a/streamx-console/streamx-console-webapp/src/views/flink/app/EditStreamX.vue b/streamx-console/streamx-console-webapp/src/views/flink/app/EditStreamX.vue index 7f6b1be789..59b2b72538 100644 --- a/streamx-console/streamx-console-webapp/src/views/flink/app/EditStreamX.vue +++ b/streamx-console/streamx-console-webapp/src/views/flink/app/EditStreamX.vue @@ -228,7 +228,7 @@ + @change="handleChangeStrategy"> use existing @@ -442,6 +442,54 @@ + + + + minute + + + + count + + + + {{ o.name }} + + + + +

+ + Note + Operation after checkpoint failure, e.g:
+ Within 5 minutes(checkpoint failure rate interval), if the number of checkpoint failures reaches 10 (max failures per interval),action will be triggered(alert or restart job) +
+

+
+ + @@ -451,7 +499,7 @@ mode="multiple" :max-tag-count="selectTagCount.count1" placeholder="Please select parameter" - @change="handleConf" + @change="handleChangeConf" v-decorator="['configuration']"> - - - - {{ conf.opt }} ( {{ conf.name }} ) - - - - @@ -591,7 +618,7 @@ mode="multiple" :max-tag-count="jmMaxTagCount" placeholder="Please select the resource parameters to set" - @change="handleJmMemory" + @change="handleChangeJmMemory" v-decorator="['jmOptions']">