/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://w...content-available-to-author-only...e.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#include <conio.h>
#include <ctime>
#include <Poco/Timestamp.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class SimpleAsyncConsumer : public ExceptionListener,
public MessageListener,
public DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
Poco::Timestamp now;
bool useTopic;
std::string brokerURI;
std::string destURI;
std::string selector;
bool clientAck;
int numMessages;
private:
SimpleAsyncConsumer( const SimpleAsyncConsumer& );
SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
private:
long int perMsgMicro ;
long int maxMicro ;
long int minMicro ;
long int totalTime ;
long int msgSendTime ;
int count;
int maxMsgNumber;
long int firstToLastTime;
public:
SimpleAsyncConsumer( const std::string& brokerURI,
const std::string& destURI,
const std::string& selector,
const Poco::Timestamp& now,
bool useTopic = false,
bool clientAck = false,
int numMessages = 10 ) :
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
now(now),
useTopic(useTopic),
brokerURI(brokerURI),
destURI(destURI),
selector(selector),
clientAck(clientAck),
numMessages(numMessages)
{
perMsgMicro = 0;
maxMicro = 0;
minMicro = 0;
totalTime = 0;
count = 0;
msgSendTime = 0;
firstToLastTime = 0;
}
virtual ~SimpleAsyncConsumer() {
std::cout << "ActiveMQ " << (useTopic?"Topic:":"Queue:") <<"Number of Messages: "<< count <<" Max Msg No: "<< maxMsgNumber << " Rcvd Total Time,Avg,Min,Max,first2last (ms): "
<<(totalTime / 1000) << ","
<<(totalTime / (count * 1000)) << ","
<< minMicro / 1000 << ","
<< maxMicro / 1000 << ","
<< firstToLastTime / 1000
<<std::endl;
this->cleanup();
}
void close() {
this->cleanup();
}
void runConsumer() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory( brokerURI );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
if( amqConnection != NULL ) {
amqConnection->addTransportListener( this );
}
connection->start();
connection->setExceptionListener(this);
// Create a Session
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
if (selector.length())
{
std::cout<<"Selector is " << selector<<std::endl;
consumer = session->createConsumer( destination,std::string("Selector = " + selector ) );
}
else
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered MessageListener.
virtual void onMessage( const Message* message ) {
//static int count = 0;
//long int msgSendTime=0;
try
{
count++;
if (count == 1)
{
now.update();
totalTime = now.epochMicroseconds();
}
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
msgSendTime = message->getLongProperty("SendTime");
} else {
text = "NOT A TEXTMESSAGE!";
}
if( clientAck ) {
message->acknowledge();
}
now.update();
printf( "Message #%10d Received: %s Per Msg Time:%10f ms\n", count, text.c_str(),((double)now.epochMicroseconds() - msgSendTime) / 1000 );
if (count == 1)
{
firstToLastTime = msgSendTime;
//printf("Max Msg No - First %ld, now micro %ld\n",firstToLastTime,now.epochMicroseconds());
minMicro = (now.epochMicroseconds() - msgSendTime);
maxMicro = (now.epochMicroseconds() - msgSendTime);
}
if (count >= numMessages)
{
firstToLastTime = now.epochMicroseconds() - firstToLastTime;
totalTime = now.epochMicroseconds() - totalTime;
//printf("Max Msg No - last %ld\n",now.epochMicroseconds());
}
if ((now.epochMicroseconds() - msgSendTime) >= maxMicro)
maxMsgNumber = count;
maxMicro = std::max(now.epochMicroseconds() - msgSendTime,maxMicro);
minMicro = std::min(now.epochMicroseconds() - msgSendTime,minMicro);
//totalTime += (now.epochMicroseconds() - msgSendTime);
//perMsgMicro = now.elapsed();
} catch (CMSException& e) {
e.printStackTrace();
}
}
// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
printf("CMS Exception occurred. Shutting down client.\n");
exit(1);
}
virtual void transportInterrupted() {
std::cout << "The Connection's Transport has been Interrupted." << std::endl;
}
virtual void transportResumed() {
std::cout << "The Connection's Transport has been Restored." << std::endl;
}
private:
void cleanup(){
try {
if( connection != NULL ) {
connection->close();
}
} catch ( CMSException& e ) {
e.printStackTrace();
}
delete destination;
delete consumer;
delete session;
delete connection;
}
};
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
if (argv[1] != NULL) std::cout << "Destination - "<< argv[1] <<"\n";
if (argv[2] != NULL) std::cout << "Use Topic/Queue - "<< argv[2] <<"\n";
if (argv[3] != NULL) std::cout << "Number of Messages - "<< argv[3] <<"\n";
if (argv[4] != NULL) std::cout << "Selector Value - "<< argv[4] <<"\n";
std::cout << "-----------------------------------------------------\n";
// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// http://a...content-available-to-author-only...e.org/cms/
//
std::string brokerURI =
"failover:(tcp://192.168.180.49:61616)";
//"failover://(tcp://192.168.180.49:61616?socketBufferSize=150000&ioBufferSize=150000&wireFormat.tightEncodingEnabled=true&jms.prefetchPolicy.all=100000)";
//"failover://(tcp://192.168.180.49:61616?socketBufferSize=150000&ioBufferSize=150000&wireFormat.tightEncodingEnabled=true&jms.prefetchPolicy.all=100000&inputBufferSize=15000&outputBufferSize=15000)";
//"failover:(tcp://192.168.180.49:61616?wireFormat.tightEncodingEnabled=true&wireFormat.tcpNoDelayEnabled=true&jms.useAsyncSend=true&jms.prefetchPolicy.all=100000&useAsyncSend=true)";
//"failover:(tcp://localhost:61616)";
//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the consumer listens, to have the consumer
// use a topic or queue set the 'useTopics' flag.
//============================================================
std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in the consumer.
//============================================================
bool useTopics = false;
//============================================================
// set to true if you want the consumer to use client ack mode
// instead of the default auto ack mode.
//============================================================
bool clientAck = false;
int numMessages = 10;
std::string selector("");
if ((argv[1] != NULL))
destURI = argv[1];
if ((argv[2] != NULL) && (argv[2] == std::string("Topic")))
useTopics = true;
if (argv[3] != NULL)
numMessages = atoi(argv[3]);
if (argv[4] != NULL)
selector = argv[4];
std::cout << "=====================================================\n";
std::cout << "XStarting the example:" << std::endl;
std::cout << "useTopics - "<< useTopics <<std::endl;
std::cout << "Destination - "<< destURI <<std::endl;
std::cout << "Number of Messages - "<< numMessages <<"\n";
std::cout << "Selector - "<< selector <<"\n";
std::cout << "=====================================================\n";
Poco::Timestamp now;
// Create the consumer
SimpleAsyncConsumer consumer( brokerURI, destURI, selector, now, useTopics, clientAck, numMessages );
// Start it up and it will listen forever.
consumer.runConsumer();
/****
// Wait to exit.
//std::cout << "Press 'q' to quit" << std::endl;
while( std::cin.get() != 'q') {}
****/
while(!std::cin.get() ) {}
// All CMS resources should be closed before the library is shutdown.
consumer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}