fork(1) download
  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements. See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License. You may obtain a copy of the License at
  8.  *
  9.  * http://w...content-available-to-author-only...e.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. #include <ctime>
  18. #include <Poco/Timestamp.h>
  19. #include <decaf/lang/Thread.h>
  20. #include <decaf/lang/Runnable.h>
  21. #include <decaf/util/concurrent/CountDownLatch.h>
  22. #include <decaf/lang/Long.h>
  23. #include <decaf/util/Date.h>
  24. #include <activemq/core/ActiveMQConnectionFactory.h>
  25. #include <activemq/util/Config.h>
  26. #include <activemq/library/ActiveMQCPP.h>
  27. #include <cms/Connection.h>
  28. #include <cms/Session.h>
  29. #include <cms/TextMessage.h>
  30. #include <cms/BytesMessage.h>
  31. #include <cms/MapMessage.h>
  32. #include <cms/ExceptionListener.h>
  33. #include <cms/MessageListener.h>
  34. #include <stdlib.h>
  35. #include <stdio.h>
  36. #include <iostream>
  37. #include <memory>
  38. #include <string>
  39. using namespace activemq;
  40. using namespace activemq::core;
  41. using namespace decaf;
  42. using namespace decaf::lang;
  43. using namespace decaf::util;
  44. using namespace decaf::util::concurrent;
  45. using namespace cms;
  46. using namespace std;
  47. //using namespace Poco::Timestamp;
  48.  
  49. ////////////////////////////////////////////////////////////////////////////////
  50. class SimpleProducer : public Runnable {
  51. private:
  52.  
  53. Connection* connection;
  54. Session* session;
  55. Destination* destination;
  56. MessageProducer* producer;
  57. bool persistentMode;
  58. bool useTopic;
  59. bool clientAck;
  60. unsigned int numMessages;
  61. std::string brokerURI;
  62. std::string destURI;
  63.  
  64. private:
  65.  
  66. SimpleProducer( const SimpleProducer& );
  67. SimpleProducer& operator= ( const SimpleProducer& );
  68.  
  69. public:
  70.  
  71. SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
  72. const std::string& destURI, bool persistentMode = true, bool useTopic = false, bool clientAck = false ) :
  73. connection(NULL),
  74. session(NULL),
  75. destination(NULL),
  76. producer(NULL),
  77. persistentMode(persistentMode),
  78. useTopic(useTopic),
  79. clientAck(clientAck),
  80. numMessages(numMessages),
  81. brokerURI(brokerURI),
  82. destURI(destURI)
  83. { }
  84.  
  85. virtual ~SimpleProducer(){
  86. //printf( "Total Time Taken %lld\n", now.elapsed() );
  87. cleanup();
  88. }
  89.  
  90. void close() {
  91. this->cleanup();
  92. }
  93.  
  94. virtual void run() {
  95. try {
  96. std::cout<<"Reached here1"<<std::endl;
  97. // Create a ConnectionFactory
  98. auto_ptr<ActiveMQConnectionFactory> connectionFactory(
  99. new ActiveMQConnectionFactory( brokerURI ) );
  100. std::cout<<"Reached here2"<<std::endl;
  101. // Create a Connection
  102. try{
  103. connection = connectionFactory->createConnection();
  104. connection->start();
  105. std::cout<<"Reached here2.1"<<std::endl;
  106. } catch( CMSException& e ) {
  107. e.printStackTrace();
  108. //throw e;
  109. }
  110. std::cout<<"Reached here3"<<std::endl;
  111.  
  112.  
  113. // Create a Session
  114. if( clientAck ) {
  115. session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
  116. } else {
  117. session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
  118. }
  119.  
  120. // Create the destination (Topic or Queue)
  121. if( useTopic ) {
  122. destination = session->createTopic( destURI );
  123. } else {
  124. destination = session->createQueue( destURI );
  125. }
  126.  
  127. // Create a MessageProducer from the Session to the Topic or Queue
  128. producer = session->createProducer( destination );
  129.  
  130. if (!persistentMode)
  131. {
  132. std::cout<<"NON Persistent Mode "<<std::endl;
  133. producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
  134. }
  135. std::cout<<"Reached here"<<std::endl;
  136. // Create the Thread Id String
  137. //string threadIdStr = Long::toString( Thread::currentThread()->getId() );
  138.  
  139. // Create a messages
  140. //string text = (string)" - Hello world! from thread " + threadIdStr;
  141.  
  142. string text(" - Hello World!");
  143.  
  144. Poco::Timestamp now;
  145. int tempCount = 0;
  146. //std::time_t t1 = now.epochTime();
  147. now.update();
  148. long epochStartTime = now.epochMicroseconds();
  149. for( unsigned int ix=0; ix<numMessages; ++ix ){
  150.  
  151. auto_ptr<TextMessage> message(session->createTextMessage( text ) );
  152.  
  153. message->setIntProperty( "Integer", ix );
  154. now.update();
  155. message->setIntProperty( "Selector",++tempCount);
  156. message->setLongProperty( "SendTime", now.epochMicroseconds() );
  157.  
  158. // Tell the producer to send the message
  159. printf( "Message #%10d Sent - %s Send Time: %10ld \n", ix+1, text.c_str(),now.epochMicroseconds() );
  160. producer->send( message.get() );
  161.  
  162. if (tempCount >= 5)
  163. tempCount = 0;
  164. }
  165. //Poco::Timestamp::TimeDiff diff = now.elapsed();
  166.  
  167. printf( "ActiveMQ %s:Num Messages %d Total Send Time Taken %ld ms \n", std::string(useTopic?"Topic":"Queue").c_str(),numMessages, (now.epochMicroseconds() - epochStartTime) / 1000 );
  168.  
  169. }catch ( CMSException& e ) {
  170. e.printStackTrace();
  171. std::cout<<"Throw"<<std::endl;
  172. }
  173. }
  174.  
  175. private:
  176.  
  177. void cleanup(){
  178.  
  179. try {
  180. if( connection != NULL ) {
  181. connection->close();
  182. }
  183. } catch ( CMSException& e ) {
  184. e.printStackTrace();
  185. }
  186.  
  187. delete destination;
  188. delete producer;
  189. delete session;
  190. delete connection;
  191. }
  192. };
  193.  
  194. ////////////////////////////////////////////////////////////////////////////////
  195. int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
  196.  
  197. activemq::library::ActiveMQCPP::initializeLibrary();
  198.  
  199. std::cout << "=====================================================\n";
  200. std::cout << "Starting the example:" << std::endl;
  201. std::cout << "Destination - "<< argv[1] <<"\n";
  202. std::cout << "Topic / Queue - "<< argv[2] <<"\n";
  203. std::cout << "Perisistent Mode - "<< argv[3] <<"\n";
  204. std::cout << "Number of Messages - "<< argv[4] <<"\n";
  205. std::cout << "-----------------------------------------------------\n";
  206.  
  207. // Set the URI to point to the IPAddress of your broker.
  208. // add any optional params to the url to enable things like
  209. // tightMarshalling or tcp logging etc. See the CMS web site for
  210. // a full list of configuration options.
  211. //
  212. // http://a...content-available-to-author-only...e.org/cms/
  213. // tcp://127.0.0.1:61613?wireFormat=stomp
  214. std::string brokerURI = "failover://(tcp://192.168.180.49:61616)";
  215. //"failover://(tcp://192.168.180.49:61616?socketBufferSize=150000&ioBufferSize=150000&wireFormat.tightEncodingEnabled=true&jms.prefetchPolicy.all=100000&inputBufferSize=15000&outputBufferSize=15000&jms.useAsyncSend=true)";
  216. //"failover://(tcp://192.168.180.49:61616)";
  217. //"failover:(tcp://192.168.180.49:61616?wireFormat.tightEncodingEnabled=true&wireFormat.tcpNoDelayEnabled=true&jms.useAsyncSend=true&jms.prefetchPolicy.all=100000&useAsyncSend=true)";
  218.  
  219. //============================================================
  220. // Total number of messages for this producer to send.
  221. //============================================================
  222. unsigned int numMessages = 25000;
  223.  
  224. //============================================================
  225. // This is the Destination Name and URI options. Use this to
  226. // customize where the Producer produces, to have the producer
  227. // use a topic or queue set the 'useTopics' flag.
  228. //============================================================
  229. std::string destURI = "TEST.FOO";
  230.  
  231. //============================================================
  232. // set to true to use topics instead of queues
  233. // Note in the code above that this causes createTopic or
  234. // createQueue to be used in the producer.
  235. //============================================================
  236. bool useTopics = false;
  237.  
  238. if (strlen(argv[4]) != 0)
  239. numMessages = atoi(argv[4]);
  240.  
  241. if (argv[2] == std::string("Topic"))
  242. useTopics = true;
  243.  
  244. if (strlen(argv[1]) != 0)
  245. destURI = argv[1];
  246.  
  247. std::cout << "=====================================================\n";
  248. std::cout << "XStarting the example:" << std::endl;
  249. std::cout << "Destination - "<< argv[1] <<std::endl;
  250. std::cout << "useTopics - "<< useTopics <<std::endl;
  251. std::cout << "Perisistent Mode - "<< (argv[3] == std::string("Persistent")) <<std::endl;
  252. std::cout << "numMessages - - "<< argv[4] <<std::endl;
  253. std::cout << "=====================================================\n";
  254.  
  255. // Create the producer and run it.
  256. SimpleProducer producer( brokerURI, numMessages, destURI, (argv[3] == std::string("Persistent")), useTopics );
  257.  
  258. // Publish the given number of Messages
  259. producer.run();
  260.  
  261. // Before exiting we ensure that all CMS resources are closed.
  262. producer.close();
  263.  
  264. std::cout << "-----------------------------------------------------\n";
  265. std::cout << "Finished with the example..." << std::endl;
  266. std::cout << "=====================================================\n";
  267. std::cout<<"HELLO1";
  268. //activemq::library::ActiveMQCPP::shutdownLibrary();
  269.  
  270. std::cout<<"HELLO";
  271. }
Compilation error #stdin compilation error #stdout 0s 0KB
stdin
Standard input is empty
compilation info
prog.cpp:18:28: fatal error: Poco/Timestamp.h: No such file or directory
compilation terminated.
stdout
Standard output is empty