Skip to content

Commit

Permalink
[New Feature] checkPoint Failure Options (sent email or restart)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed May 26, 2021
1 parent 46037f3 commit fa521d8
Show file tree
Hide file tree
Showing 24 changed files with 580 additions and 238 deletions.
14 changes: 3 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ Make Flink|Spark easier

The original intention of `StreamX` is to make the development of `Flink` easier. `StreamX` focuses on the management of development phases and tasks. Our ultimate goal is to build a one-stop big data solution integrating stream processing, batch processing, data warehouse and data laker.

<video src="http://assets.streamxhub.com/streamx.mp4" controls="controls" autoplay="autoplay" width="100%" height="100%"></video>
[![Watch the video](http://assets.streamxhub.com/streamx_player.png)](http://assets.streamxhub.com/streamx.mp4)

![](http://assets.streamxhub.com/streamx-main.png?12345)

![](http://assets.streamxhub.com/streamx-sql.png?12345)

![](http://assets.streamxhub.com/streamx-flameGraph.png?123456)

## 🎉 Features
* Scaffolding
Expand Down Expand Up @@ -104,18 +103,12 @@ Thanks to the above excellent open source projects and many outstanding open sou
Thanks to the [fire-spark](https://github.com/GuoNingNing/fire-spark) project for the early inspiration and help.

### 🚀 Quick Start

Prerequisites for building:

* Maven 3.6+
* npm 7.11.2 (https://nodejs.org/en/)
* JDK 1.8+

```
git clone https://github.com/streamxhub/streamx.git
cd Streamx
mvn clean install -DskipTests
mvn clean install -DskipTests -Denv=prod
```
click [Document](http://www.streamxhub.com/zh/doc/) for more information

## 👻 Why not...❓

Expand All @@ -140,7 +133,6 @@ You can contact us or ask questions via:

- [New an issue](https://github.com/streamxhub/streamx/issues/new)
- [Join us](#-Join-us)
- [Who uses Streamx](https://github.com/streamxhub/streamx/issues/163)

## 💰 Donation

Expand Down
14 changes: 10 additions & 4 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ Make Flink|Spark easier!!!
我们在使用 `Flink` 时发现从编程模型, 启动配置到运维管理都有很多可以抽象共用的地方, 我们将一些好的经验固化下来并结合业内的最佳实践, 通过不断努力终于诞生了今天的框架 —— `StreamX`, 项目的初衷是 —— 让 `Flink` 开发更简单,
使用 `StreamX` 开发,可以极大降低学习成本和开发门槛, 让开发者只用关心最核心的业务, `StreamX` 规范了项目的配置,鼓励函数式编程,定义了最佳的编程方式,提供了一系列开箱即用的 `Connectors` ,标准化了配置、开发、测试、部署、监控、运维的整个过程, 提供 `Scala``Java` 两套api,
其最终目的是打造一个一站式大数据平台,流批一体,湖仓一体的解决方案
<video src="http://assets.streamxhub.com/streamx.mp4" controls="controls" autoplay="autoplay" width="100%" height="100%"></video>

[![Watch the video](http://assets.streamxhub.com/streamx_player.png)](http://assets.streamxhub.com/streamx.mp4)

![](http://assets.streamxhub.com/streamx-main.png?12345)

![](http://assets.streamxhub.com/streamx-sql.png?12345)

![](http://assets.streamxhub.com/streamx-flameGraph.png?123456)

## 🎉 Features
* 开发脚手架
* 一系列开箱即用的connectors
Expand Down Expand Up @@ -103,6 +102,14 @@ Make Flink|Spark easier!!!
感谢以上优秀的开源项目和很多未提到的优秀开源项目,致以最崇高的敬意,特别感谢[Apache Zeppelin](http://zeppelin.apache.org)[IntelliJ IDEA](https://www.jetbrains.com/idea/)
感谢 [fire-spark](https://github.com/GuoNingNing/fire-spark) 项目,早期给予的灵感和帮助, 感谢我老婆在项目开发时给予的支持,悉心照顾我的生活和日常,给予我足够的时间开发这个项目

### 🚀 快速上手
```
git clone https://github.com/streamxhub/streamx.git
cd Streamx
mvn clean install -DskipTests -Denv=prod
```
更多请查看[官网文档](http://www.streamxhub.com/zh/doc/)

## 👻 为什么不是...❓

### Apache Zeppelin
Expand All @@ -125,7 +132,6 @@ Make Flink|Spark easier!!!

- [快速创建issue!](https://github.com/streamxhub/streamx/issues/new)
- [加入社区](#-加入社区)
- [谁在使用streamx](https://github.com/streamxhub/streamx/issues/163)

## 💰 Donation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,23 @@ public class Application implements Serializable {

private Long duration;

/**
* checkpoint最大的失败次数
*/
private Integer cpMaxFailureInterval;

/**
* checkpoint在时间范围内失败(分钟)
*/
private Integer cpFailureRateInterval;

/**
* 在X分钟之后失败Y次,之后触发的操作:
* 1: 发送告警
* 2: 重启
*/
private Integer cpFailureAction;

/**
* overview
*/
Expand Down Expand Up @@ -202,6 +219,11 @@ public void setState(Integer state) {
}
}

@JsonIgnore
public boolean cpFailedTrigger() {
return this.cpMaxFailureInterval != null && this.cpFailureRateInterval != null && this.cpFailureAction != null;
}

@JsonIgnore
public boolean eqFlinkJob(Application other) {
if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
Expand Down Expand Up @@ -356,7 +378,7 @@ public boolean eqJobParam(Application other) {
//6) Dynamic Option 是否发生变化
//7) Program Args 是否发生变化
if (!this.getResolveOrder().equals(other.getResolveOrder()) ||
!this.getExecutionMode().equals(other.getExecutionMode())) {
!this.getExecutionMode().equals(other.getExecutionMode())) {
return false;
}

Expand Down Expand Up @@ -467,10 +489,10 @@ public static class Pom {
@Override
public String toString() {
return "{" +
"groupId='" + groupId + '\'' +
", artifactId='" + artifactId + '\'' +
", version='" + version + '\'' +
'}';
"groupId='" + groupId + '\'' +
", artifactId='" + artifactId + '\'' +
", version='" + version + '\'' +
'}';
}

private String getGav() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
*/
package com.streamxhub.streamx.console.core.enums;

import java.io.Serializable;

/**
* @author benjobs
*/
public enum AppExistsState {
public enum AppExistsState implements Serializable {

/**
* 不存在
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
*/
package com.streamxhub.streamx.console.core.enums;

import java.io.Serializable;

/**
* @author benjobs
*/
public enum ApplicationType {
public enum ApplicationType implements Serializable {
/**
* StreamX Flink
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
*/
package com.streamxhub.streamx.console.core.enums;

import java.io.Serializable;
import java.util.Arrays;

public enum CandidateType {
public enum CandidateType implements Serializable {

/**
* 非候选版本
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
*/
package com.streamxhub.streamx.console.core.enums;

import java.io.Serializable;
import java.util.Arrays;

/**
* @author benjobs
*/

public enum ChangedType {
public enum ChangedType implements Serializable {
/**
* 未发生变化
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ public enum CheckPointType implements Serializable {
/**
* SAVEPOINT
*/
SAVEPOINT(2);
SAVEPOINT(2),

SYNC_SAVEPOINT(3);

int value;

public int get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
*/
package com.streamxhub.streamx.console.core.enums;

import java.io.Serializable;
import java.util.Arrays;

/**
* @author benjobs
*/
public enum DeployState {
public enum DeployState implements Serializable {

/**
* 需用重新发布,但是下载maven依赖失败.(针对flinkSql任务)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

import lombok.Getter;

import java.io.Serializable;

/**
* @author benjobs
*/
@Getter
public enum FlinkAppState {
public enum FlinkAppState implements Serializable {

/**
* added new job to database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
*/
package com.streamxhub.streamx.console.core.enums;

import java.io.Serializable;

/**
* @author benjobs
*/
public enum GitAuthorizedError {
public enum GitAuthorizedError implements Serializable {

/**
* 没有错误
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@

import lombok.Getter;

import java.io.Serializable;
import java.util.Arrays;

/**
* @author benjobs
*/
@Getter
public enum OptionState {
public enum OptionState implements Serializable {

/**
* Application which is currently action: none.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package com.streamxhub.streamx.console.core.metrics.flink;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.streamxhub.streamx.console.core.enums.CheckPointStatus;
import com.streamxhub.streamx.console.core.enums.CheckPointType;
import lombok.Data;

Expand All @@ -33,6 +34,8 @@
@Data
public class CheckPoints implements Serializable {

private Counts counts;

private List<CheckPoint> history;

private Latest latest;
Expand Down Expand Up @@ -65,16 +68,17 @@ public static class CheckPoint implements Serializable {

private Boolean discarded;

public boolean isCompleted() {
return "COMPLETED".equals(this.status);
public CheckPointStatus getCheckPointStatus() {
return CheckPointStatus.valueOf(this.status);
}


public CheckPointType getCheckPointType() {
if ("CHECKPOINT".equals(this.checkpointType)) {
return CheckPointType.CHECKPOINT;
} else if ("SAVEPOINT".equals(this.checkpointType)) {
return CheckPointType.SAVEPOINT;
}
return CheckPointType.SAVEPOINT;
return CheckPointType.SYNC_SAVEPOINT;
}

public String getPath() {
Expand All @@ -86,4 +90,18 @@ public String getPath() {
public static class Latest implements Serializable {
private CheckPoint completed;
}

@Data
public static class Counts implements Serializable {
private Integer completed;

private Integer failed;

@JsonProperty("in_progress")
private Integer inProgress;

private Integer restored;

private Integer total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
package com.streamxhub.streamx.console.core.service;

import com.streamxhub.streamx.console.core.entity.Application;
import com.streamxhub.streamx.console.core.enums.FlinkAppState;

import java.io.Serializable;

/**
* @author benjobs
Expand All @@ -33,6 +34,7 @@ public interface AlertService {
*
* @param application
*/
void alert(Application application, FlinkAppState appState);
void alert(Application application, Serializable state);


}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ public interface ApplicationService extends IService<Application> {
void revoke(Application app) throws Exception;

Boolean delete(Application app);

void restart(Application application) throws Exception;
}
Loading

0 comments on commit fa521d8

Please sign in to comment.