Skip to content

Commit

Permalink
feat(rocketmq) quick start
Browse files Browse the repository at this point in the history
  • Loading branch information
ni-ze committed Jul 2, 2022
1 parent 3c9103e commit 9f327df
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 65 deletions.
88 changes: 85 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,94 @@
# Rocketmq Streams SQL
# rsqldb

Rocketmq Streams SQL 为 Rocketmq Streams 的开发提供了基于SQL的开发体验, 让基于消息队列的流式开发更加容易;
rsqldb 为 Rocketmq Streams 的开发提供了基于SQL的开发体验, 让基于消息队列的流式开发更加容易;

## Features

* 采用标准的流式SQL规范,可以与其他的流计算框架如Flink完美兼容;
* 兼容Flink自带的```udf``````udaf``````udtf```,除此之外,用户还可以通过实现相关接口来轻松扩展函数;


如果您希望更详细的了解Rsqldb的相关内容, 请点击[这里](docs/SUMMARY.md)
如果您希望更详细的了解rsqldb的相关内容, 请点击[这里](docs/SUMMARY.md)


## Quickstart
### 运行环境
- JDK 1.8及以上
- Maven 3.2及以上

### 下载rsqldb工程并本地构建
```xml
git clone https://github.com/alibaba/rsqldb.git

mvn clean package -DskipTest -U
```

### 拷贝安装压缩包并解压

进入rsqldb-disk模块下,将rsqldb-distribution.tar.gz安装包拷贝到任意目录,并执行命令解压并进入解压目录:
```xml
tar -zxvf rsqldb-distribution.tar.gz;cd rsqldb
```


### 启动rsqldb服务端
```shell
chmod +x bin/startAll.sh;sh bin/startAll.sh
```

### 配置sql文件
sendDataFromFile.sql中创建的任务,需要从本地文件指定位置读取数据,所以需要修改sendDataFromFile.sql中filePath变量的位置,修改为数据文件data.txt的绝对路径。


### 提交任务
执行路径依然在rsqldb解压目录下
```shell
chmod +x client/clientExector.sh;sh client/clientExector.sh submitTask sendDataFromFile.sql
```


### 启动任务
在rsqldb解压目录下执行,tail运行日志,为查看结果做准备。
```shell
tail -f log/rsqldb-runner.log
```

另开一个shell窗口,进入解压后的rsqldb目录,执行以下命令启动任务,1分钟后,查看日志输出,会将执行结果打印到日志中。
```shell
sh client/clientExector.sh startTask
```

### 查询任务
在rsqldb解压目录下执行
```shell
sh client/clientExector.sh queryTask
```
返回已经提交的任务列表。

### 停止任务
在rsqldb解压目录下执行
```shell
sh client/clientExector.sh stopTask
```

### 从RocketMQ中读取数据并处理
上述示例为从本地文件data.txt中读取数据,更为常用的用法是从RocketMQ中读取数据处理,下面给出具体步骤:

- 本地安装并启动RocketMQ,[安装文档](https://rocketmq.apache.org/docs/quick-start/)
- 启动rsqldb服务端
```shell
chmod +x bin/startAll.sh;sh bin/startAll.sh
```
- 提交任务
```shell
chmod +x client/clientExector.sh;sh client/clientExector.sh submitTask rocketmq.sql
```
- 查看输出
```shell
tail -f log/rsqldb-runner.log
```
- 另开一个窗口,启动任务
```shell
sh client/clientExector.sh startTask
```
- 向RocketMQ中生产数据:topic为rsqldb-source,与rocketmq.sql任务中的topic名称保持一致,向该topic写入data.txt文件中的数据。观察rsqldb-runner.log日志输出。
1 change: 0 additions & 1 deletion docs/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Summary

* [Introduction](README.md)
* [Quick Start](quick_start/README.md)
* [Blink SQL兼容](stream_sql/README.md)
* [创建源表](stream_source/README.md)
* [创建metaq源表](stream_source/metaq/README.md)
Expand Down
21 changes: 0 additions & 21 deletions docs/quick_start/README.md

This file was deleted.

Binary file removed docs/quick_start/pic/img.png
Binary file not shown.
5 changes: 0 additions & 5 deletions docs/quick_start/standalone/README.md

This file was deleted.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<blink.version>blink-3.4.0</blink.version>
<flink.version>1.13.1</flink.version>
<scala-library.version>2.12.4</scala-library.version>
<rocketmq-streams.version>1.0.3-preview-SNAPSHOT</rocketmq-streams.version>
<rocketmq-streams.version>1.0.2-preview-SNAPSHOT</rocketmq-streams.version>
</properties>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
public class SubmitTask {

public static void main(String[] args) throws Throwable {
if (args == null || args.length < 1) {
throw new IllegalArgumentException("home.dir is required.");
if (args == null || args.length < 2) {
throw new IllegalArgumentException("home.dir and sql file name are required.");
}

String homeDir = args[0];
String sqlFileName = args[1];

String sqlPath = homeDir + "/client/standalone.sql";
String sqlPath = homeDir + "/client/" + sqlFileName;

File file = FileUtil.getFile(sqlPath);
byte[] bytes = Files.readAllBytes(file.toPath());
Expand Down
31 changes: 31 additions & 0 deletions rsqldb-disk/client/clientExector.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/sh

binDir=$(cd `dirname $0`;pwd)

cd $binDir/..
homeDir=$(pwd)

cd $binDir


mainClass=()
if [ "x$1" == "xsubmitTask" ]; then
mainClass=com.alibaba.rsqldb.client.SubmitTask
java -cp rsqldb-client-1.0.0-SNAPSHOT.jar ${mainClass} ${homeDir} $2
fi

if [ "x$1" == "xstartTask" ]; then
mainClass=com.alibaba.rsqldb.client.StartTask
java -cp rsqldb-client-1.0.0-SNAPSHOT.jar ${mainClass}
fi

if [ "x$1" == "xqueryTask" ]; then
mainClass=com.alibaba.rsqldb.client.QueryTask
java -cp rsqldb-client-1.0.0-SNAPSHOT.jar ${mainClass}
fi

if [ "x$1" == "xstopTask" ]; then
mainClass=com.alibaba.rsqldb.client.StopTask
java -cp rsqldb-client-1.0.0-SNAPSHOT.jar ${mainClass}
fi

6 changes: 4 additions & 2 deletions rsqldb-disk/client/data.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
1,2,3,4,5,6,7,8,9
2,2,3,4,5,6,7,8,9
1,2,3,4
2,2,3,4
3,2,3,4
4,2,3,4
50 changes: 50 additions & 0 deletions rsqldb-disk/client/rocketmq.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
CREATE TABLE `rocketmq_source`
(
field_1 VARCHAR,
field_2 VARCHAR,
field_3 VARCHAR,
field_4 VARCHAR
) WITH (
type = 'rocketmq',
topic = 'rsqldb-source',
groupName = 'rsqldb-group',
namesrvAddr = '127.0.0.1:9876',
isJsonData = 'false',
msgIsJsonArray = 'false'
);


-- 数据标准化

create view rocketmq_view as
select field_1
, field_2
, field_3
, field_4
from (
select field_1
, field_2
, field_3
, field_4
from rocketmq_source
)
where (
field_1='1'
);

CREATE TABLE `task_sink_2`
(
field_1 VARCHAR,
field_2 VARCHAR,
field_3 VARCHAR,
field_4 VARCHAR
) WITH (
type = 'print'
);

insert into task_sink_2
select field_1
, field_2
, field_3
, field_4
from rocketmq_view
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ CREATE TABLE `test_source`
field_1 VARCHAR,
field_2 VARCHAR,
field_3 VARCHAR,
field_4 VARCHAR,
field_5 VARCHAR,
field_6 VARCHAR,
field_7 VARCHAR,
field_8 VARCHAR,
field_9 VARCHAR
field_4 VARCHAR
) WITH (
type = 'file',
filePath = '/Users/nize/code/github/rsqldb/rsqldb-disk/client/data.txt',
-- 需要根据自身填写data.txt的绝对路径
filePath = '',
isJsonData = 'false',
msgIsJsonArray = 'false'
);
Expand All @@ -24,21 +20,11 @@ select field_1
, field_2
, field_3
, field_4
, field_5
, field_6
, field_7
, field_8
, field_9
from (
select field_1
, field_2
, field_3
, field_4
, field_5
, field_6
, field_7
, field_8
, field_9
from test_source
)
where (
Expand All @@ -50,12 +36,7 @@ CREATE TABLE `test_sink`
field_1 VARCHAR,
field_2 VARCHAR,
field_3 VARCHAR,
field_4 VARCHAR,
field_5 VARCHAR,
field_6 VARCHAR,
field_7 VARCHAR,
field_8 VARCHAR,
field_9 VARCHAR
field_4 VARCHAR
) WITH (
type = 'print'
);
Expand All @@ -65,9 +46,4 @@ select field_1
, field_2
, field_3
, field_4
, field_5
, field_6
, field_7
, field_8
, field_9
from view_test
2 changes: 1 addition & 1 deletion rsqldb-runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-channel-rocketmq</artifactId>
<version>1.0.3-preview-SNAPSHOT</version>
<version>1.0.2-preview-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down

0 comments on commit 9f327df

Please sign in to comment.