Skip to content

Commit

Permalink
init project
Browse files Browse the repository at this point in the history
  • Loading branch information
tang3lei committed Feb 13, 2019
1 parent 2f0bf88 commit d0e9253
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 28 deletions.
3 changes: 3 additions & 0 deletions gossip_const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
server_port = 8899
client_port = 56112
heartbeat_secs = 15.0
67 changes: 41 additions & 26 deletions gossip_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import tornado.ioloop
import tornado.iostream
import periodicCallback
import message
from message import Message
import queue
import signal
import sys,os
import functools
import gossip_const


def deal_sigint(signum, frame):
Expand All @@ -16,6 +17,7 @@ def deal_sigint(signum, frame):
con.close()
if g.sock:
g.sock.close()

try:
sys.exit(0)
except SystemExit:
Expand All @@ -33,7 +35,6 @@ def __init__(self, host, port, heartbeat, id):
self.connection_map = {} # key: addr value: socket
self.msg_queue = queue.Queue()
self._time_stamp_map = {}
self._cliport = 56112
self._addr_fd_map = {}

def __del__(self):
Expand All @@ -50,55 +51,66 @@ def socket_init(self):
print('Except', e)

def _add_read_handle(self, connection, addr, fd, events):

r_stream = tornado.iostream.IOStream(connection)
data = r_stream.read_until(Message.end_delimiter(),max_bytes=1024)

msg = r_stream.read_until(message.Message.end_delimiter,max_bytes=1024)
if msg.exception() is not None:
if data.exception() is not None:
print('close')
self._del_connection(addr)
io_loop1.remove_handler(fd)
tornado.ioloop.IOLoop.current().remove_handler(fd)
else:
print('rec',connection)
print(msg.result())
msg = data.result().decode('utf-8')
str_list = msg.split(Message.str_delimiter())
if str_list[0] == '0x22':
print('heartbeat')
self._time_stamp_map[addr] = tornado.ioloop.IOLoop.current().time()



def _add_connection(self, connection, address):
print('add connection')
#if(address[-1]) != self._cliport:
# return
con = self.connection_map.get(address)

if con is None :
self._addr_fd_map[address] = connection.fileno()
self.connection_map[address] = connection
self._time_stamp_map[address] = tornado.ioloop.IOLoop.time(io_loop1)
self._time_stamp_map[address] = tornado.ioloop.IOLoop.current().time()

read_handle = functools.partial(self._add_read_handle,connection,address)
io_loop1.add_handler(connection.fileno(),read_handle,io_loop1.READ)
tmp_loop = tornado.ioloop.IOLoop.current()
tmp_loop.add_handler(connection.fileno(),read_handle,tmp_loop.READ)
else:
self._addr_fd_map[address] = connection.fileno()
self.connection_map[address] = connection
self._time_stamp_map[address] = tornado.ioloop.IOLoop.time(io_loop1)
self._time_stamp_map[address] = tornado.ioloop.IOLoop.current().time()

read_handle = functools.partial(self._add_read_handle,connection,address)
io_loop1.remove_handler(connection.fileno())
io_loop1.add_handler(connection.fileno(),read_handle,io_loop1.READ)
tmp_loop = tornado.ioloop.IOLoop.current()
tmp_loop.remove_handler(connection.fileno())
tmp_loop.add_handler(connection.fileno(),read_handle,tmp_loop.READ)

def _check_connection(self):
tmp_time = tornado.ioloop.IOLoop.time(io_loop1)
tmp_time = tornado.ioloop.IOLoop.current().time()
tmp_list = []

for addr, t in self._time_stamp_map.items():
if (tmp_time-t) > 5.0:
if (tmp_time-t) > gossip_const.heartbeat_secs:
tmp_list.append(addr)
elif self.connection_map[addr].fileno() == -1:
tmp_list.append(addr)

for addr in tmp_list:
io_loop1.remove_handler(self._addr_fd_map[addr])
tornado.ioloop.IOLoop.current().remove_handler(self._addr_fd_map[addr])

self._time_stamp_map.pop(addr)
self.connection_map.pop(addr)
self._addr_fd_map.pop(addr)

async def handle_connection(self, connection, address):
print(connection.fileno())

if connection.fileno() == -1:
self._del_connection(address)
else:
Expand All @@ -107,6 +119,7 @@ async def handle_connection(self, connection, address):

def _del_connection(self, address):
print('del connection')

self.connection_map.pop(address)
self._addr_fd_map.pop(address)
self._time_stamp_map.pop(address)
Expand All @@ -120,25 +133,26 @@ def connection_ready(self, fd, events):
raise
return
connection.setblocking(0)

io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(self.handle_connection, connection, address)

def broadcast_heartbeat(self):
self._check_connection()
for addr, con in self.connection_map.items():
#msg = message.Message(1)
print('server:',con)
msg = ''
i = 0
while i<99:
msg += 'sadasdasdaasda'
i+=1
print('heartbeat:',con)

msg = '0x22&|'

self.sendMsg(msg, con)

def sendMsg(self, message, connection):
def sendMsg(self, msg, connection):
s_stream = tornado.iostream.IOStream(connection)
s_stream.write(message.encode('utf-8'))
s_stream.write(msg.encode('utf-8'))

def join(self, network):
pass

def fun(str):
print('1234',str)
Expand All @@ -148,12 +162,13 @@ def fun(str):

signal.signal(signal.SIGINT, deal_sigint)

g = Gossip_Server('', 8899, 123, 'server 1')
g = Gossip_Server('', gossip_const.server_port, 123, 'server 1')
#host port beat str_id
g.socket_init()

io_loop1 = tornado.ioloop.IOLoop.current()
io_loop1.add_handler(g.sock,g.connection_ready,io_loop1.READ)
#p = periodicCallback.PeriodicCallback(g.broadcast_heartbeat,1).start()

p = periodicCallback.PeriodicCallback(g.broadcast_heartbeat,3).start() #每三秒心跳一次

io_loop1.start()
17 changes: 16 additions & 1 deletion message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@

class Message:

end_delimiter = b'|'
_str_delimiter = '&'

def __init__(self, version):
self.ver = version

#消息间分隔符
@staticmethod
def end_delimiter():
return b'|'

#消息内分隔符
@staticmethod
def str_delimiter():
return Message._str_delimiter

#消息头 例0x22心跳包 0x17区块包 0x18数据包

#

2 changes: 1 addition & 1 deletion test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def produce(c):
print(data)
'''
sock.send(b'sdaegsg|')
sock.send(b'0x22&|')
#time.sleep(5)


0 comments on commit d0e9253

Please sign in to comment.