Skip to content

Commit

Permalink
又写了一点
Browse files Browse the repository at this point in the history
  • Loading branch information
Alovez committed Aug 9, 2017
1 parent 2be331d commit 5d016fa
Showing 1 changed file with 96 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,99 @@ def serve():
args = parser.parse_args()
```

这里我们会开启分配器进程及另外两个进程
这里我们会开启分配器进程以及一个`runner_checker`函数进程,和一个`redistribute`函数进程。

```python
server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
print `serving on %s:%s` % (args.host, int(args.port))

...

runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
redistributor = threading.Thread(target=redistribute, args=(server,))
try:
runner_heartbeat.start()
redistributor.start()
# Activate the server; this will keep running until you
# interrupt the program with Ctrl+C or Cmd+C
server.serve_forever()
except (KeyboardInterrupt, Exception):
# if any exception occurs, kill the thread
server.dead = True
runner_heartbeat.join()
redistributor.join()

```

`runner_checker`函数会定期的ping每一个注册在位的运行器,来确保他们都处于正常工作的状态。如果有运行器没有响应,该函数就会将其从注册的运行器池中删除,并且之前分配给他的提交ID会被重新分配给一个新的可用的运行器。函数会在`pending_commits`变量中记录运行受到运行器失去响应影响的提交ID

```python
def runner_checker(server):
def manage_commit_lists(runner):
for commit, assigned_runner in server.dispatched_commits.iteritems():
if assigned_runner == runner:
del server.dispatched_commits[commit]
server.pending_commits.append(commit)
break
server.runners.remove(runner)
while not server.dead:
time.sleep(1)
for runner in server.runners:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"ping")
if response != "pong":
print "removing runner %s" % runner
manage_commit_lists(runner)
except socket.error as e:
manage_commit_lists(runner)
```

`redistribute`用来将`pending_commits`中记录的提交ID进行重新分配。`redistribute`运行时会不断的检查`pending_commits`文件,一旦发现`pending_commits`中存在提交ID,函数会调用 `dispatch_tests`方法来分配这个提交ID。

```python
def redistribute(server):
while not server.dead:
for commit in server.pending_commits:
print "running redistribute"
print server.pending_commits
dispatch_tests(server, commit)
time.sleep(5)
```

`dispatch_tests`函数用来从已注册的运行器池中返回一个可用的运行器。如果得到了一个可用的运行器,函数会发送一个带有提交ID的运行测试指令。如果当前没有可用的运行器,函数会在2s的休眠之后重复上述过程。如果分配成功了,函数会在`dispatched_commits`变量中记录提交ID及该提交ID的测试正在由哪一个运行器运行。如果提交ID在`pending_commits`中,`dispatch_tests`函数会在重新分配后将提交ID从`pending_commits`中删除。

```python
def dispatch_tests(server, commit_id):
# NOTE: usually we don't run this forever
while True:
print "trying to dispatch to runners"
for runner in server.runners:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"runtest:%s" % commit_id)
if response == "OK":
print "adding id %s" % commit_id
server.dispatched_commits[commit_id] = runner
if commit_id in server.pending_commits:
server.pending_commits.remove(commit_id)
return
time.sleep(2)
```

分配器服务用到了标准库中的一个叫`SocketServer`的非常简单的网络服务器模块。`SocketServer`模块中有四种基本的服务器类型:`TCP`, `UDP`, `UnixStreamServer``UnixDatagramServer`。为了保证我们的数据传输连续稳定,我们使用基于TCP协议的套接字(UPD并不能保证数据的稳定和连续)。

`SocketServer`中提供的默认的`TCPServer`最多只支持同时维持一个会话。所以当分配器与一个运行器建立会话后,就无法再与监听器建立连接了。此时来自监听器的会话只能等待第一个会话完成并断开连接才能建立与分配器的连接。这对于我们的项目而言并不是非常理想,在我们预想中,分配器应该直接而迅速的同时与所有运行器及监听器进行通信。

为了使我们的分配器可以同时维护多个连接,我们使用了一个自定义的类`ThreadingTCPServer`来为默认的`SocketServer`类增加多线程运行的功能。也就是说无论何时分配器接收到连接请求,他都会新建一个进程来处理这个会话。这就使分配器同时维护多个连接成为了可能。

```python
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
runners = [] # Keeps track of test runner pool
dead = False # Indicate to other threads that we are no longer running
dispatched_commits = {} # Keeps track of commits we dispatched
pending_commits = [] # Keeps track of commits we have yet to dispatch
```

0 comments on commit 5d016fa

Please sign in to comment.