/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://w...content-available-to-author-only...e.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <ctime>
#include <Poco/Timestamp.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
#include <string>
using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
//using namespace Poco::Timestamp;
////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool persistentMode;
bool useTopic;
bool clientAck;
unsigned int numMessages;
std::string brokerURI;
std::string destURI;
private:
SimpleProducer( const SimpleProducer& );
SimpleProducer& operator= ( const SimpleProducer& );
public:
SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
const std::string& destURI, bool persistentMode = true, bool useTopic = false, bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
persistentMode(persistentMode),
useTopic(useTopic),
clientAck(clientAck),
numMessages(numMessages),
brokerURI(brokerURI),
destURI(destURI)
{ }
virtual ~SimpleProducer(){
//printf( "Total Time Taken %lld\n", now.elapsed() );
cleanup();
}
void close() {
this->cleanup();
}
virtual void run() {
try {
std::cout<<"Reached here1"<<std::endl;
// Create a ConnectionFactory
auto_ptr<ActiveMQConnectionFactory> connectionFactory(
new ActiveMQConnectionFactory( brokerURI ) );
std::cout<<"Reached here2"<<std::endl;
// Create a Connection
try{
connection = connectionFactory->createConnection();
connection->start();
std::cout<<"Reached here2.1"<<std::endl;
} catch( CMSException& e ) {
e.printStackTrace();
//throw e;
}
std::cout<<"Reached here3"<<std::endl;
// Create a Session
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer( destination );
if (!persistentMode)
{
std::cout<<"NON Persistent Mode "<<std::endl;
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
}
std::cout<<"Reached here"<<std::endl;
// Create the Thread Id String
//string threadIdStr = Long::toString( Thread::currentThread()->getId() );
// Create a messages
//string text = (string)" - Hello world! from thread " + threadIdStr;
string text(" - Hello World!");
Poco::Timestamp now;
int tempCount = 0;
//std::time_t t1 = now.epochTime();
now.update();
long epochStartTime = now.epochMicroseconds();
for( unsigned int ix=0; ix<numMessages; ++ix ){
auto_ptr<TextMessage> message(session->createTextMessage( text ) );
message->setIntProperty( "Integer", ix );
now.update();
message->setIntProperty( "Selector",++tempCount);
message->setLongProperty( "SendTime", now.epochMicroseconds() );
// Tell the producer to send the message
printf( "Message #%10d Sent - %s Send Time: %10ld \n", ix+1, text.c_str(),now.epochMicroseconds() );
producer->send( message.get() );
if (tempCount >= 5)
tempCount = 0;
}
//Poco::Timestamp::TimeDiff diff = now.elapsed();
printf( "ActiveMQ %s:Num Messages %d Total Send Time Taken %ld ms \n", std::string(useTopic?"Topic":"Queue").c_str(),numMessages, (now.epochMicroseconds() - epochStartTime) / 1000 );
}catch ( CMSException& e ) {
e.printStackTrace();
std::cout<<"Throw"<<std::endl;
}
}
private:
void cleanup(){
try {
if( connection != NULL ) {
connection->close();
}
} catch ( CMSException& e ) {
e.printStackTrace();
}
delete destination;
delete producer;
delete session;
delete connection;
}
};
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "Destination - "<< argv[1] <<"\n";
std::cout << "Topic / Queue - "<< argv[2] <<"\n";
std::cout << "Perisistent Mode - "<< argv[3] <<"\n";
std::cout << "Number of Messages - "<< argv[4] <<"\n";
std::cout << "-----------------------------------------------------\n";
// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// http://a...content-available-to-author-only...e.org/cms/
// tcp://127.0.0.1:61613?wireFormat=stomp
std::string brokerURI = "failover://(tcp://192.168.180.49:61616)";
//"failover://(tcp://192.168.180.49:61616?socketBufferSize=150000&ioBufferSize=150000&wireFormat.tightEncodingEnabled=true&jms.prefetchPolicy.all=100000&inputBufferSize=15000&outputBufferSize=15000&jms.useAsyncSend=true)";
//"failover://(tcp://192.168.180.49:61616)";
//"failover:(tcp://192.168.180.49:61616?wireFormat.tightEncodingEnabled=true&wireFormat.tcpNoDelayEnabled=true&jms.useAsyncSend=true&jms.prefetchPolicy.all=100000&useAsyncSend=true)";
//============================================================
// Total number of messages for this producer to send.
//============================================================
unsigned int numMessages = 25000;
//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the Producer produces, to have the producer
// use a topic or queue set the 'useTopics' flag.
//============================================================
std::string destURI = "TEST.FOO";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in the producer.
//============================================================
bool useTopics = false;
if (strlen(argv[4]) != 0)
numMessages = atoi(argv[4]);
if (argv[2] == std::string("Topic"))
useTopics = true;
if (strlen(argv[1]) != 0)
destURI = argv[1];
std::cout << "=====================================================\n";
std::cout << "XStarting the example:" << std::endl;
std::cout << "Destination - "<< argv[1] <<std::endl;
std::cout << "useTopics - "<< useTopics <<std::endl;
std::cout << "Perisistent Mode - "<< (argv[3] == std::string("Persistent")) <<std::endl;
std::cout << "numMessages - - "<< argv[4] <<std::endl;
std::cout << "=====================================================\n";
// Create the producer and run it.
SimpleProducer producer( brokerURI, numMessages, destURI, (argv[3] == std::string("Persistent")), useTopics );
// Publish the given number of Messages
producer.run();
// Before exiting we ensure that all CMS resources are closed.
producer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example..." << std::endl;
std::cout << "=====================================================\n";
std::cout<<"HELLO1";
//activemq::library::ActiveMQCPP::shutdownLibrary();
std::cout<<"HELLO";
}