-
Notifications
You must be signed in to change notification settings - Fork 0
/
msg_host.py
86 lines (67 loc) · 2.7 KB
/
msg_host.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#
# Twisted is an asyncronous connection manager
# Every time a new connection is made the SubPubFactory
# creates a new protocol class, which handles the
# communication with the connection. This is where
# messages are organized and stored while awating
# acknowledgement
#
from twisted.internet.protocol import Protocol, Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor
import json, redis, time, parthenon, signal
class SubPubFactory(Factory):
protocol = parthenon.ParthenonMQ
subscribers = []
message_id = 0
r_server = redis.Redis('localhost')
ack_queue = {}
def __init__(self):
signal.signal(signal.SIGALRM, self.timer)
signal.setitimer(signal.ITIMER_REAL,.01,.01)
self.protocol.factory = self
def timer(self, signal, _):
"""
automattically trigger the distribute function so that
messages don't pile up.
"""
self.distribute()
def send_message(self, sub):
sub.deliver_message(self.message,self.now)
def distribute(self):
"""
check the ack_queue for messages that are stale and need to be
resent. If a message has been attempted 5 times it is deleted.
Then check the redis database for new messages and send them
along.
"""
message_count = len(self.r_server.lrange('message_queue',0,-1))
for i in self.ack_queue.keys():
if len(self.ack_queue.keys()) > 0 and self.now - i > 2 and self.ack_queue[i][1] < 5:
for j in self.ack_queue[i][0]:
j.deliver_message(self.ack_queue[i][2])
self.ack_queue[i][1] += 1
elif len(self.ack_queue.keys()) > 0 and self.now - i > 2 and self.ack_queue[i][1] == 5:
self.ack_queue.pop(i)
for i in range(message_count):
self.now = time.time()
message = self.r_server.lpop('message_queue')
self.ack_queue[self.now] = [self.subscribers[:],0,message]
for j in self.ack_queue[self.now][0]:
j.deliver_message(message, self.now)
def ack(self, time_stamp, subscriber):
"""
keep the acknowledgement queue clean.
"""
try:
self.ack_queue.get(time_stamp)[0].remove(subscriber)
if len(self.ack_queue.get(time_stamp)[0]) == 0:
self.ack_queue.pop(time_stamp)
except ValueError:
pass #should never happen
except KeyError:
pass #should never happen
factory = SubPubFactory()
endpoint = TCP4ServerEndpoint(reactor, 8080)
endpoint.listen(factory)
reactor.run()