Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
The Stomp host specified by the client.
The login ID used by the client.
Parameters Hash, possibly nil for a non-hashed connect.
The login credentials used by the client.
The Stomp host's listening port.
Is this connection reliable?
A new Client object can be initialized using three forms:
Hash (this is the recommended Client initialization method):
hash = { :hosts => [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ], :reliable => true, :initial_reconnect_delay => 0.01, :max_reconnect_delay => 30.0, :use_exponential_back_off => true, :back_off_multiplier => 2, :max_reconnect_attempts => 0, :randomize => false, :connect_timeout => 0, :connect_headers => {}, :parse_timeout => 5, :logger => nil, :dmh => false, :closed_check => true, :hbser => false, :stompconn => false, :usecrlf => false, :max_hbread_fails => 0, :max_hbrlck_fails => 0, } e.g. c = Stomp::Client.new(hash)
Positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port e.g. c = Stomp::Client.new(urlstring)
# File lib/stomp/client.rb, line 82 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) parse_hash_params(login) || parse_stomp_url(login) || parse_failover_url(login) || parse_positional_params(login, passcode, host, port, reliable) check_arguments!() @id_mutex = Mutex.new() @ids = 1 create_connection(autoflush) start_listeners() end
open is syntactic sugar for 'Client.new', see 'initialize' for usage.
# File lib/stomp/client.rb, line 110 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end
Abort aborts work in a transaction by name.
# File lib/stomp/client.rb, line 126 def abort(name, headers = {}) @connection.abort(name, headers) # replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do |message| if listener = find_listener(message) listener.call(message) end end end end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,{:ack => 'client'}). Accepts a transaction header ( :transaction => 'some_transaction_id' ).
# File lib/stomp/client.rb, line 173 def acknowledge(message, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << message end if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end if protocol() == Stomp::SPL_12 @connection.ack(message.headers['ack'], headers) else @connection.ack(message.headers['message-id'], headers) end end
autoflush returns the current connection's autoflush setting.
# File lib/stomp/client.rb, line 304 def autoflush() @connection.autoflush() end
autoflush= sets the current connection's autoflush setting.
# File lib/stomp/client.rb, line 299 def autoflush=(af) @connection.autoflush = af end
Begin starts work in a a transaction by name.
# File lib/stomp/client.rb, line 121 def begin(name, headers = {}) @connection.begin(name, headers) end
close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.
# File lib/stomp/client.rb, line 237 def close(headers={}) @listener_thread.exit @connection.disconnect(headers) end
close? tests if this client connection is closed.
# File lib/stomp/client.rb, line 231 def closed?() @connection.closed?() end
Commit commits work in a transaction by name.
# File lib/stomp/client.rb, line 141 def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end
Return the broker's CONNECTED frame to the client. Misnamed.
# File lib/stomp/client.rb, line 216 def connection_frame() @connection.connection_frame end
Return any RECEIPT frame received by DISCONNECT.
# File lib/stomp/client.rb, line 221 def disconnect_receipt() @connection.disconnect_receipt end
#hbrecv_count returns the current connection's heartbeat receive count.
# File lib/stomp/client.rb, line 288 def hbrecv_count() @connection.hbrecv_count() end
#hbrecv_interval returns the connection's heartbeat receive interval.
# File lib/stomp/client.rb, line 278 def hbrecv_interval() @connection.hbrecv_interval() end
#hbsend_count returns the current connection's heartbeat send count.
# File lib/stomp/client.rb, line 283 def hbsend_count() @connection.hbsend_count() end
#hbsend_interval returns the connection's heartbeat send interval.
# File lib/stomp/client.rb, line 273 def hbsend_interval() @connection.hbsend_interval() end
join the listener thread for this client, generally used to wait for a quit signal.
# File lib/stomp/client.rb, line 116 def join(limit = nil) @listener_thread.join(limit) end
Stomp 1.1+ NACK.
# File lib/stomp/client.rb, line 195 def nack(message_id, headers = {}) @connection.nack(message_id, headers) end
open? tests if this client connection is open.
# File lib/stomp/client.rb, line 226 def open? @connection.open?() end
Poll for asynchronous messages issued by broker. Return nil of no message available, else the message
# File lib/stomp/client.rb, line 294 def poll() @connection.poll() end
protocol returns the current client's protocol level.
# File lib/stomp/client.rb, line 253 def protocol() @connection.protocol() end
Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => 'some_transaction_id' ).
# File lib/stomp/client.rb, line 208 def publish(destination, message, headers = {}) if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.publish(destination, message, headers) end
running checks if the thread was created and is not dead.
# File lib/stomp/client.rb, line 243 def running() @listener_thread && !!@listener_thread.status end
#set_logger identifies a new callback logger.
# File lib/stomp/client.rb, line 248 def set_logger(logger) @connection.set_logger(logger) end
sha1 returns a SHA1 sum of a given string.
# File lib/stomp/client.rb, line 263 def sha1(data) @connection.sha1(data) end
Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => 'some_transaction_id' ).
# File lib/stomp/client.rb, line 150 def subscribe(destination, headers = {}) raise "No listener given" unless block_given? # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.github.com/. # If no subscription id is provided, generate one. set_subscription_id_if_missing(destination, headers) if @listeners[headers[:id]] raise "attempting to subscribe to a queue with a previous subscription" end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end
Unreceive a message, sending it back to its queue or to the DLQ.
# File lib/stomp/client.rb, line 200 def unreceive(message, options = {}) @connection.unreceive(message, options) end
Unsubscribe from a subscription by name.
# File lib/stomp/client.rb, line 164 def unsubscribe(name, headers = {}) set_subscription_id_if_missing(name, headers) @connection.unsubscribe(name, headers) @listeners[headers[:id]] = nil end
uuid returns a type 4 UUID.
# File lib/stomp/client.rb, line 268 def uuid() @connection.uuid() end
valid_utf8? validates any given string for UTF8 compliance.
# File lib/stomp/client.rb, line 258 def valid_utf8?(s) @connection.valid_utf8?(s) end
A very basic check of required arguments.
# File lib/client/utils.rb, line 118 def check_arguments!() raise ArgumentError if @host.nil? || @host.empty? raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535 raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass) end
# File lib/stomp/client.rb, line 99 def create_connection(autoflush) if @parameters @connection = Connection.new(@parameters) else @connection = Connection.new(@login, @passcode, @host, @port, @reliable) @connection.autoflush = autoflush end end
#filter_options returns a new Hash of filtered options.
# File lib/client/utils.rb, line 125 def filter_options(options) new_options = {} new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i new_options[:randomize] = options["randomize"] == "true" # Default: false new_options[:connect_timeout] = 0 new_options end
#find_listener returns the listener for a given subscription in a given message.
# File lib/client/utils.rb, line 139 def find_listener(message) subscription_id = message.headers['subscription'] if subscription_id == nil # For backward compatibility, some messages may already exist with no # subscription id, in which case we can attempt to synthesize one. set_subscription_id_if_missing(message.headers['destination'], message.headers) subscription_id = message.headers[:id] end @listeners[subscription_id] end
e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param
# File lib/client/utils.rb, line 39 def parse_failover_url(login) regexp = /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ return false unless login =~ regexp first_host = {} first_host[:ssl] = !$2.nil? @login = first_host[:login] = $4 || "" @passcode = first_host[:passcode] = $5 || "" @host = first_host[:host] = $6 @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl]) options = $16 || "" parts = options.split(/&|=/) options = Hash[*parts] hosts = [first_host] + parse_hosts(login) @parameters = {} @parameters[:hosts] = hosts @parameters.merge! filter_options(options) @reliable = true true end
# File lib/client/utils.rb, line 11 def parse_hash_params(params) return false unless params.is_a?(Hash) @parameters = params first_host = @parameters[:hosts][0] @login = first_host[:login] @passcode = first_host[:passcode] @host = first_host[:host] @port = first_host[:port] || Connection::default_port(first_host[:ssl]) @reliable = true true end
Parse a stomp URL.
# File lib/client/utils.rb, line 99 def parse_hosts(url) hosts = [] host_match = /stomp(\+ssl)?:\/\/(([\w\.]*):(\w*)@)?([\w\.]+):(\d+)\)/ url.scan(host_match).each do |match| host = {} host[:ssl] = !match[0].nil? host[:login] = match[2] || "" host[:passcode] = match[3] || "" host[:host] = match[4] host[:port] = match[5].to_i hosts << host end hosts end
# File lib/client/utils.rb, line 61 def parse_positional_params(login, passcode, host, port, reliable) @login = login @passcode = passcode @host = host @port = port.to_i @reliable = reliable true end
# File lib/client/utils.rb, line 25 def parse_stomp_url(login) regexp = /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port return false unless login =~ regexp @login = $2 || "" @passcode = $3 || "" @host = $4 @port = $5.to_i @reliable = false true end
Register a receipt listener.
# File lib/client/utils.rb, line 83 def register_receipt_listener(listener) id = -1 @id_mutex.synchronize do id = @ids.to_s @ids = @ids.succ end @receipt_listeners[id] = listener id end
Set a subscription id in the headers hash if one does not already exist. For simplicities sake, all subscriptions have a subscription ID. setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: stomp.github.com/
# File lib/client/utils.rb, line 75 def set_subscription_id_if_missing(destination, headers) headers[:id] = headers[:id] ? headers[:id] : headers['id'] if headers[:id] == nil headers[:id] = Digest::SHA1.hexdigest(destination) end end
Start a single listener thread. Misnamed I think.
# File lib/client/utils.rb, line 151 def start_listeners() @listeners = {} @receipt_listeners = {} @replay_messages_by_txn = {} @listener_thread = Thread.start do while true message = @connection.receive if message # AMQ specific?, nil message on multiple reconnects if message.command == Stomp::CMD_MESSAGE if listener = find_listener(message) listener.call(message) end elsif message.command == Stomp::CMD_RECEIPT if listener = @receipt_listeners[message.headers['receipt-id']] listener.call(message) end end end end # while true end end
#url_regex defines a regex for e.g. login:passcode@host:port or host:port
# File lib/client/utils.rb, line 94 def url_regex '(([\w\.\-]*):(\w*)@)?([\w\.\-]+):(\d+)' end