Skip to content

Commit

Permalink
Heartbeats.
Browse files Browse the repository at this point in the history
💓 💗 💕 💞 💖
  • Loading branch information
ralphbean committed Sep 10, 2014
1 parent 9fd09ce commit 2cfcb96
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
2 changes: 2 additions & 0 deletions moksha.hub/development.ini
Expand Up @@ -112,6 +112,8 @@ orbited_scheme = http
## By default we use the MorbidQ broker, run by Orbited, for development.
#stomp_broker = localhost
#stomp_port = 61613
## This is an optional heartbeat, in milliseconds
#stomp_heartbeat = 1000
## If stomp_uri is present, stomp_broker and stomp_port are ignored
#stomp_uri = localhost:61613
## If there are multiple uris, then a failover() method is employed at runtime
Expand Down
16 changes: 14 additions & 2 deletions moksha.hub/moksha/hub/stomp/protocol.py
Expand Up @@ -57,7 +57,16 @@ def connected(self, msg):
stomper.Engine.connected(self, msg)
log.info("StompProtocol Connected: session %s." %
msg['headers']['session'])
self.client.connected()

# https://stomp.github.io/stomp-specification-1.1.html#Heart-beating
server_heartbeat = msg['headers'].get('heart-beat', 0)
if server_heartbeat:
log.info("(server wants heart-beat (%s))" % server_heartbeat)

This comment has been minimized.

Copy link
@lmacken

lmacken Sep 10, 2014

Member

Hm, this is going to be really noisy in the logs, right? Maybe log.debug?

sx, sy = server_heartbeat.split(',')
server_heartbeat = int(sy)

self.client.connected(server_heartbeat)

#f = stomper.Frame()
#f.unpack(stomper.subscribe(topic))
#print f
Expand All @@ -82,9 +91,12 @@ def subscribe(self, dest, **headers):

def connectionMade(self):
""" Register with stomp server """
log.info("Connecting with stomp-%s" % stomper.STOMP_VERSION)
if stomper.STOMP_VERSION != '1.0':
host, port = self.client.addresses[self.client.address_index]
cmd = stomper.connect(self.username, self.password, host)
interval = (self.client.client_heartbeat, 0)
log.info("(proposing heartbeat of (%i,%i))" % interval)
cmd = stomper.connect(self.username, self.password, host, interval)
else:
cmd = stomper.connect(self.username, self.password)
self.transport.write(cmd)
Expand Down
30 changes: 29 additions & 1 deletion moksha.hub/moksha/hub/stomp/stomp.py
Expand Up @@ -68,6 +68,8 @@ def __init__(self, hub, config):
self.key = self.config.get('stomp_ssl_key', None)
self.crt = self.config.get('stomp_ssl_crt', None)

self.client_heartbeat = int(self.config.get('stomp_heartbeat', 0))

self.connect(self.addresses[self.address_index], self.key, self.crt)
super(StompHubExtension, self).__init__()

Expand Down Expand Up @@ -97,22 +99,33 @@ def buildProtocol(self, addr):
self.proto = StompProtocol(self, self.username, self.password)
return self.proto

def connected(self):
def connected(self, server_heartbeat):
if server_heartbeat and self.client_heartbeat:
interval = max(self.client_heartbeat, server_heartbeat)
log.info("Heartbeat of %ims negotiated from (%i,%i); starting." % (
interval, self.client_heartbeat, server_heartbeat))
self.start_heartbeat(interval)
else:
log.info("Skipping heartbeat initialization")

for topic in self._topics:
log.info('Subscribing to %s topic' % topic)
self.subscribe(topic, callback=lambda msg: None)
self._topics = []

for frame in self._frames:
log.info('Flushing queued frame')
self.proto.transport.write(frame.pack())
self._frames = []

def clientConnectionLost(self, connector, reason):
log.info('Lost connection. Reason: %s' % reason)
self.stop_heartbeat()
self.failover()

def clientConnectionFailed(self, connector, reason):
log.error('Connection failed. Reason: %s' % reason)
self.stop_heartbeat()
self.failover()

def failover(self):
Expand All @@ -122,6 +135,21 @@ def failover(self):
log.info('(failover) reconnecting in %f seconds.' % self._delay)
reactor.callLater(self._delay, self.connect, *args)

def start_heartbeat(self, interval):
self._heartbeat_enabled = True
reactor.callLater(interval / 1000.0, self.heartbeat, interval)

def heartbeat(self, interval):
if self._heartbeat_enabled:
self.proto.transport.write(chr(0x0A)) # Lub-dub
reactor.callLater(interval / 1000.0, self.heartbeat, interval)
else:
log.info("(heartbeat stopped)")

def stop_heartbeat(self):
log.info("stopping heartbeat")
self._heartbeat_enabled = False

def send_message(self, topic, message, **headers):
f = stomper.Frame()
f.unpack(stomper.send(topic, message))
Expand Down

0 comments on commit 2cfcb96

Please sign in to comment.