diff --git a/README.md b/README.md index 2c8e66b..cfb2949 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# Rocketmq Streams SQL +# rsqldb -Rocketmq Streams SQL 为 Rocketmq Streams 的开发提供了基于SQL的开发体验, 让基于消息队列的流式开发更加容易; +rsqldb 为 Rocketmq Streams 的开发提供了基于SQL的开发体验, 让基于消息队列的流式开发更加容易; ## Features @@ -8,5 +8,87 @@ Rocketmq Streams SQL 为 Rocketmq Streams 的开发提供了基于SQL的开发 * 兼容Flink自带的```udf```、```udaf```和```udtf```,除此之外,用户还可以通过实现相关接口来轻松扩展函数; -如果您希望更详细的了解Rsqldb的相关内容, 请点击[这里](docs/SUMMARY.md) +如果您希望更详细的了解rsqldb的相关内容, 请点击[这里](docs/SUMMARY.md) + +## Quickstart +### 运行环境 +- JDK 1.8及以上 +- Maven 3.2及以上 + +### 下载rsqldb工程并本地构建 +```xml +git clone https://github.com/alibaba/rsqldb.git + +mvn clean package -DskipTest -U +``` + +### 拷贝安装压缩包并解压 + +进入rsqldb-disk模块下,将rsqldb-distribution.tar.gz安装包拷贝到任意目录,并执行命令解压并进入解压目录: +```xml +tar -zxvf rsqldb-distribution.tar.gz;cd rsqldb +``` + + +### 启动rsqldb服务端 +```shell +chmod +x bin/startAll.sh;sh bin/startAll.sh +``` + +### 配置sql文件 +sendDataFromFile.sql中创建的任务,需要从本地文件指定位置读取数据,所以需要修改sendDataFromFile.sql中filePath变量的位置,修改为数据文件data.txt的绝对路径。 + + +### 提交任务 +执行路径依然在rsqldb解压目录下 +```shell +chmod +x client/clientExector.sh;sh client/clientExector.sh submitTask sendDataFromFile.sql +``` + + +### 启动任务 +在rsqldb解压目录下执行,tail运行日志,为查看结果做准备。 +```shell +tail -f log/rsqldb-runner.log +``` + +另开一个shell窗口,进入解压后的rsqldb目录,执行以下命令启动任务,1分钟后,查看日志输出,会将执行结果打印到日志中。 +```shell +sh client/clientExector.sh startTask +``` + +### 查询任务 +在rsqldb解压目录下执行 +```shell +sh client/clientExector.sh queryTask +``` +返回已经提交的任务列表。 + +### 停止任务 +在rsqldb解压目录下执行 +```shell +sh client/clientExector.sh stopTask +``` + +### 从RocketMQ中读取数据并处理 +上述示例为从本地文件data.txt中读取数据,更为常用的用法是从RocketMQ中读取数据处理,下面给出具体步骤: + +- 本地安装并启动RocketMQ,[安装文档](https://rocketmq.apache.org/docs/quick-start/) +- 启动rsqldb服务端 +```shell + chmod +x bin/startAll.sh;sh bin/startAll.sh +``` +- 提交任务 +```shell + chmod +x client/clientExector.sh;sh client/clientExector.sh submitTask rocketmq.sql +``` +- 查看输出 +```shell +tail -f log/rsqldb-runner.log +``` +- 另开一个窗口,启动任务 +```shell +sh client/clientExector.sh startTask +``` +- 向RocketMQ中生产数据:topic为rsqldb-source,与rocketmq.sql任务中的topic名称保持一致,向该topic写入data.txt文件中的数据。观察rsqldb-runner.log日志输出。 \ No newline at end of file diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 6e501bc..318ded1 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -1,7 +1,6 @@ # Summary * [Introduction](README.md) -* [Quick Start](quick_start/README.md) * [Blink SQL兼容](stream_sql/README.md) * [创建源表](stream_source/README.md) * [创建metaq源表](stream_source/metaq/README.md) diff --git a/docs/quick_start/README.md b/docs/quick_start/README.md deleted file mode 100644 index cf22ad1..0000000 --- a/docs/quick_start/README.md +++ /dev/null @@ -1,21 +0,0 @@ -# 快速搭建 - -## 构建 -```xml -mvn clean package -Dmaven.test.skip=true -``` -在rsqldb-disk下找到rsqldb-distribution.tar.gz - -## 将压缩包解压 -tar -xvf rsqldb-distribution.tar.gz 得到如下文件结构 -![img.png](pic/img.png) - - -## 参数配置 - -熟悉了上述的内容后, 你可以 -+ [单机运行](standalone/README.md) -+ [集群运行](cluster/README.md) - -## 编写任务代码 - diff --git a/docs/quick_start/pic/img.png b/docs/quick_start/pic/img.png deleted file mode 100644 index da6a0ac..0000000 Binary files a/docs/quick_start/pic/img.png and /dev/null differ diff --git a/docs/quick_start/standalone/README.md b/docs/quick_start/standalone/README.md deleted file mode 100644 index 28a11fb..0000000 --- a/docs/quick_start/standalone/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# 快速启动单机模式 - -## 1. 编写任务代码 - -将任务代码保存为sql文件, standalone.sql 并放入jobs目录 diff --git a/pom.xml b/pom.xml index e881550..6f7b96d 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ blink-3.4.0 1.13.1 2.12.4 - 1.0.3-preview-SNAPSHOT + 1.0.2-preview-SNAPSHOT diff --git a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/QueryTask.java b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/QueryTask.java new file mode 100644 index 0000000..41dcbd4 --- /dev/null +++ b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/QueryTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rsqldb.client; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.alibaba.rsqldb.client.http.HttpHelper; + +import static com.alibaba.rsqldb.client.constant.Constants.queryTask; + +public class QueryTask { + public static void main(String[] args) throws Throwable { + String result = HttpHelper.sendRequest(queryTask, "test", null); + + System.out.println(result); + } +} diff --git a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/StartTask.java b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/StartTask.java index dc2aabd..e4e9f9d 100644 --- a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/StartTask.java +++ b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/StartTask.java @@ -39,6 +39,7 @@ public class StartTask { public static void main(String[] args) throws Throwable { - HttpHelper.startTask(startTask, "test", "test"); + String result = HttpHelper.sendRequest(startTask, "test", "test"); + System.out.println(result); } } diff --git a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/StopTask.java b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/StopTask.java new file mode 100644 index 0000000..45a695f --- /dev/null +++ b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/StopTask.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rsqldb.client; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.alibaba.rsqldb.client.http.HttpHelper; + +import static com.alibaba.rsqldb.client.constant.Constants.stopTask; + +public class StopTask { + public static void main(String[] args) throws Throwable{ + String result = HttpHelper.sendRequest(stopTask, "test", "test"); + System.out.println(result); + } +} diff --git a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/SubmitTask.java b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/SubmitTask.java index 6be19dc..74f1ef3 100644 --- a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/SubmitTask.java +++ b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/SubmitTask.java @@ -16,22 +16,6 @@ */ package com.alibaba.rsqldb.client; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ import com.alibaba.rsqldb.client.http.HttpHelper; import com.alibaba.rsqldb.client.util.FileUtil; @@ -44,12 +28,14 @@ public class SubmitTask { public static void main(String[] args) throws Throwable { - if (args == null || args.length < 1) { - throw new IllegalArgumentException("home.dir is required."); + if (args == null || args.length < 2) { + throw new IllegalArgumentException("home.dir and sql file name are required."); } + String homeDir = args[0]; + String sqlFileName = args[1]; - String sqlPath = homeDir + "/client/standalone.sql"; + String sqlPath = homeDir + "/client/" + sqlFileName; File file = FileUtil.getFile(sqlPath); byte[] bytes = Files.readAllBytes(file.toPath()); diff --git a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/constant/Constants.java b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/constant/Constants.java index af2840a..d0d37dd 100644 --- a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/constant/Constants.java +++ b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/constant/Constants.java @@ -38,4 +38,7 @@ public class Constants { public static String startTask = "http://localhost:8080/command/task/start"; + public static String queryTask = "http://localhost:8080/command/task/list"; + + public static String stopTask = "http://localhost:8080/command/task/stop"; } diff --git a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/http/HttpHelper.java b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/http/HttpHelper.java index 3375ae0..d8c567b 100644 --- a/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/http/HttpHelper.java +++ b/rsqldb-client/src/main/java/com/alibaba/rsqldb/client/http/HttpHelper.java @@ -48,7 +48,7 @@ import static org.apache.http.HttpHeaders.USER_AGENT; public class HttpHelper { - public static void submitTask(String url,String namespace, String taskName, String sql) throws Exception { + public static void submitTask(String url, String namespace, String taskName, String sql) throws Exception { HttpClient client = new DefaultHttpClient(); HttpPost post = new HttpPost(url); @@ -77,10 +77,9 @@ public static void submitTask(String url,String namespace, String taskName, Stri } System.out.println(result); - } - public static void startTask(String url, String namespace, String taskName) throws Throwable { + public static String sendRequest(String url, String namespace, String taskName) throws Throwable { HttpClient client = new DefaultHttpClient(); HttpPost post = new HttpPost(url); @@ -89,7 +88,9 @@ public static void startTask(String url, String namespace, String taskName) thro List urlParameters = new ArrayList(); urlParameters.add(new BasicNameValuePair("namespace", namespace)); - urlParameters.add(new BasicNameValuePair("taskName", taskName)); + if (taskName != null && !"".equals(taskName)) { + urlParameters.add(new BasicNameValuePair("taskName", taskName)); + } post.setEntity(new UrlEncodedFormEntity(urlParameters)); @@ -106,6 +107,6 @@ public static void startTask(String url, String namespace, String taskName) thro result.append(line); } - System.out.println(result); + return result.toString(); } } diff --git a/rsqldb-disk/client/clientExector.sh b/rsqldb-disk/client/clientExector.sh new file mode 100644 index 0000000..6e46543 --- /dev/null +++ b/rsqldb-disk/client/clientExector.sh @@ -0,0 +1,31 @@ +#!/bin/sh + +binDir=$(cd `dirname $0`;pwd) + +cd $binDir/.. +homeDir=$(pwd) + +cd $binDir + + +mainClass=() +if [ "x$1" == "xsubmitTask" ]; then + mainClass=com.alibaba.rsqldb.client.SubmitTask + java -cp rsqldb-client-1.0.0-SNAPSHOT.jar ${mainClass} ${homeDir} $2 +fi + +if [ "x$1" == "xstartTask" ]; then + mainClass=com.alibaba.rsqldb.client.StartTask + java -cp rsqldb-client-1.0.0-SNAPSHOT.jar ${mainClass} +fi + +if [ "x$1" == "xqueryTask" ]; then + mainClass=com.alibaba.rsqldb.client.QueryTask + java -cp rsqldb-client-1.0.0-SNAPSHOT.jar ${mainClass} +fi + +if [ "x$1" == "xstopTask" ]; then + mainClass=com.alibaba.rsqldb.client.StopTask + java -cp rsqldb-client-1.0.0-SNAPSHOT.jar ${mainClass} +fi + diff --git a/rsqldb-disk/client/data.txt b/rsqldb-disk/client/data.txt index 1ca8e47..b3f4a5d 100644 --- a/rsqldb-disk/client/data.txt +++ b/rsqldb-disk/client/data.txt @@ -1,2 +1,4 @@ -1,2,3,4,5,6,7,8,9 -2,2,3,4,5,6,7,8,9 \ No newline at end of file +1,2,3,4 +2,2,3,4 +3,2,3,4 +4,2,3,4 \ No newline at end of file diff --git a/rsqldb-disk/client/rocketmq.sql b/rsqldb-disk/client/rocketmq.sql new file mode 100644 index 0000000..e90d016 --- /dev/null +++ b/rsqldb-disk/client/rocketmq.sql @@ -0,0 +1,50 @@ +CREATE TABLE `rocketmq_source` +( + field_1 VARCHAR, + field_2 VARCHAR, + field_3 VARCHAR, + field_4 VARCHAR +) WITH ( + type = 'rocketmq', + topic = 'rsqldb-source', + groupName = 'rsqldb-group', + namesrvAddr = '127.0.0.1:9876', + isJsonData = 'false', + msgIsJsonArray = 'false' + ); + + +-- 数据标准化 + +create view rocketmq_view as +select field_1 + , field_2 + , field_3 + , field_4 +from ( + select field_1 + , field_2 + , field_3 + , field_4 + from rocketmq_source + ) +where ( + field_1='1' + ); + +CREATE TABLE `task_sink_2` +( + field_1 VARCHAR, + field_2 VARCHAR, + field_3 VARCHAR, + field_4 VARCHAR +) WITH ( + type = 'print' + ); + +insert into task_sink_2 +select field_1 + , field_2 + , field_3 + , field_4 +from rocketmq_view diff --git a/rsqldb-disk/client/standalone.sql b/rsqldb-disk/client/sendDataFromFile.sql similarity index 54% rename from rsqldb-disk/client/standalone.sql rename to rsqldb-disk/client/sendDataFromFile.sql index ca746d8..b393c97 100644 --- a/rsqldb-disk/client/standalone.sql +++ b/rsqldb-disk/client/sendDataFromFile.sql @@ -3,15 +3,11 @@ CREATE TABLE `test_source` field_1 VARCHAR, field_2 VARCHAR, field_3 VARCHAR, - field_4 VARCHAR, - field_5 VARCHAR, - field_6 VARCHAR, - field_7 VARCHAR, - field_8 VARCHAR, - field_9 VARCHAR + field_4 VARCHAR ) WITH ( type = 'file', - filePath = '/Users/nize/code/github/rsqldb/rsqldb-disk/client/data.txt', +-- 需要根据自身填写data.txt的绝对路径 + filePath = '', isJsonData = 'false', msgIsJsonArray = 'false' ); @@ -24,21 +20,11 @@ select field_1 , field_2 , field_3 , field_4 - , field_5 - , field_6 - , field_7 - , field_8 - , field_9 from ( select field_1 , field_2 , field_3 , field_4 - , field_5 - , field_6 - , field_7 - , field_8 - , field_9 from test_source ) where ( @@ -50,12 +36,7 @@ CREATE TABLE `test_sink` field_1 VARCHAR, field_2 VARCHAR, field_3 VARCHAR, - field_4 VARCHAR, - field_5 VARCHAR, - field_6 VARCHAR, - field_7 VARCHAR, - field_8 VARCHAR, - field_9 VARCHAR + field_4 VARCHAR ) WITH ( type = 'print' ); @@ -65,9 +46,4 @@ select field_1 , field_2 , field_3 , field_4 - , field_5 - , field_6 - , field_7 - , field_8 - , field_9 from view_test diff --git a/rsqldb-runner/pom.xml b/rsqldb-runner/pom.xml index ddb2f80..30302c7 100644 --- a/rsqldb-runner/pom.xml +++ b/rsqldb-runner/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-streams-channel-rocketmq - 1.0.3-preview-SNAPSHOT + 1.0.2-preview-SNAPSHOT compile diff --git a/rsqldb-server/src/main/java/com/alibaba/rsqldb/server/controller/CommandLineController.java b/rsqldb-server/src/main/java/com/alibaba/rsqldb/server/controller/CommandLineController.java index fc7c0e9..e4eede0 100644 --- a/rsqldb-server/src/main/java/com/alibaba/rsqldb/server/controller/CommandLineController.java +++ b/rsqldb-server/src/main/java/com/alibaba/rsqldb/server/controller/CommandLineController.java @@ -20,52 +20,61 @@ public void setTaskService(ITaskService taskService) { } @PostMapping("/task/list") - public String listTasks(@RequestParam(value = "namespace", defaultValue = "default") String namespace) { + public String listTasks(@RequestParam(value = "namespace", defaultValue = "default") String namespace) throws Throwable { try { return taskService.list(namespace); } catch (Exception e) { e.printStackTrace(); + throw e; } - return String.format("Task for %s list success!", namespace); + } @PostMapping("/task/submit") - public String submitTask(@RequestParam(value = "namespace", defaultValue = "default") String namespace, @RequestParam(value = "taskName") String taskName, @RequestParam String sql) { + public String submitTask(@RequestParam(value = "namespace", defaultValue = "default") String namespace, + @RequestParam(value = "taskName") String taskName, @RequestParam String sql) throws Throwable { try { taskService.submit(namespace, taskName, sql); + return String.format("Task %s|%s submit sql success!", namespace, taskName); } catch (Exception e) { e.printStackTrace(); + throw e; } - return String.format("Task %s|%s submit success!", namespace, taskName); } @PostMapping("/task/submit/file") - public String submitTaskFile(@RequestParam(value = "namespace", defaultValue = "default") String namespace, @RequestParam(value = "taskName") String taskName, @RequestBody String sqlPath) { + public String submitTaskFile(@RequestParam(value = "namespace", defaultValue = "default") String namespace, + @RequestParam(value = "taskName") String taskName, @RequestBody String sqlPath) throws Throwable { try { taskService.submitFile(namespace, taskName, sqlPath); + return String.format("Task %s|%s submit file success!", namespace, taskName); } catch (Exception e) { e.printStackTrace(); + throw e; } - return String.format("Task %s|%s submit success!", namespace, taskName); } @PostMapping("/task/start") - public String startTask(@RequestParam(value = "namespace", defaultValue = "default") String namespace, @RequestParam(value = "taskName") String taskName) { + public String startTask(@RequestParam(value = "namespace", defaultValue = "default") String namespace, + @RequestParam(value = "taskName") String taskName) throws Throwable { try { taskService.start(namespace, taskName); + return String.format("Task %s|%s start success", namespace, taskName); } catch (Exception e) { e.printStackTrace(); + throw e; } - return String.format("Task %s|%s start success", namespace, taskName); } @PostMapping("/task/stop") - public String stopTask(@RequestParam(value = "namespace", defaultValue = "default") String namespace, @RequestParam(value = "taskName") String taskName) { + public String stopTask(@RequestParam(value = "namespace", defaultValue = "default") String namespace, + @RequestParam(value = "taskName") String taskName) throws Throwable { try { taskService.stop(namespace, taskName); + return String.format("Task %s|%s stop success", taskName, taskName); } catch (Exception e) { e.printStackTrace(); + throw e; } - return String.format("Task %s|%s start success", taskName, taskName); } }