fork download
  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements. See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License. You may obtain a copy of the License at
  8.  *
  9.  * http://w...content-available-to-author-only...e.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. //#include <conio.h>
  18. #include <ctime>
  19. #include <Poco/Timestamp.h>
  20. #include <decaf/lang/Thread.h>
  21. #include <decaf/lang/Runnable.h>
  22. #include <decaf/util/concurrent/CountDownLatch.h>
  23. #include <activemq/core/ActiveMQConnectionFactory.h>
  24. #include <activemq/core/ActiveMQConnection.h>
  25. #include <activemq/transport/DefaultTransportListener.h>
  26. #include <activemq/library/ActiveMQCPP.h>
  27. #include <decaf/lang/Integer.h>
  28. #include <activemq/util/Config.h>
  29. #include <decaf/util/Date.h>
  30. #include <cms/Connection.h>
  31. #include <cms/Session.h>
  32. #include <cms/TextMessage.h>
  33. #include <cms/BytesMessage.h>
  34. #include <cms/MapMessage.h>
  35. #include <cms/ExceptionListener.h>
  36. #include <cms/MessageListener.h>
  37. #include <stdlib.h>
  38. #include <stdio.h>
  39. #include <iostream>
  40.  
  41. using namespace activemq;
  42. using namespace activemq::core;
  43. using namespace activemq::transport;
  44. using namespace decaf::lang;
  45. using namespace decaf::util;
  46. using namespace decaf::util::concurrent;
  47. using namespace cms;
  48. using namespace std;
  49.  
  50. ////////////////////////////////////////////////////////////////////////////////
  51. class SimpleAsyncConsumer : public ExceptionListener,
  52. public MessageListener,
  53. public DefaultTransportListener {
  54. private:
  55.  
  56. Connection* connection;
  57. Session* session;
  58. Destination* destination;
  59. MessageConsumer* consumer;
  60. Poco::Timestamp now;
  61. bool useTopic;
  62. std::string brokerURI;
  63. std::string destURI;
  64. std::string selector;
  65. bool clientAck;
  66. int numMessages;
  67.  
  68. private:
  69.  
  70. SimpleAsyncConsumer( const SimpleAsyncConsumer& );
  71. SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
  72.  
  73. private:
  74. long int perMsgMicro ;
  75. long int maxMicro ;
  76. long int minMicro ;
  77. long int totalTime ;
  78. long int msgSendTime ;
  79. int count;
  80. int maxMsgNumber;
  81. long int firstToLastTime;
  82.  
  83. public:
  84.  
  85. SimpleAsyncConsumer( const std::string& brokerURI,
  86. const std::string& destURI,
  87. const std::string& selector,
  88. const Poco::Timestamp& now,
  89. bool useTopic = false,
  90. bool clientAck = false,
  91. int numMessages = 10 ) :
  92. connection(NULL),
  93. session(NULL),
  94. destination(NULL),
  95. consumer(NULL),
  96. now(now),
  97. useTopic(useTopic),
  98. brokerURI(brokerURI),
  99. destURI(destURI),
  100. selector(selector),
  101. clientAck(clientAck),
  102. numMessages(numMessages)
  103. {
  104. perMsgMicro = 0;
  105. maxMicro = 0;
  106. minMicro = 0;
  107. totalTime = 0;
  108. count = 0;
  109. msgSendTime = 0;
  110. firstToLastTime = 0;
  111. }
  112.  
  113. virtual ~SimpleAsyncConsumer() {
  114.  
  115. std::cout << "ActiveMQ " << (useTopic?"Topic:":"Queue:") <<"Number of Messages: "<< count <<" Max Msg No: "<< maxMsgNumber << " Rcvd Total Time,Avg,Min,Max,first2last (ms): "
  116. <<(totalTime / 1000) << ","
  117. <<(totalTime / (count * 1000)) << ","
  118. << minMicro / 1000 << ","
  119. << maxMicro / 1000 << ","
  120. << firstToLastTime / 1000
  121. <<std::endl;
  122. this->cleanup();
  123. }
  124.  
  125. void close() {
  126.  
  127. this->cleanup();
  128. }
  129.  
  130. void runConsumer() {
  131.  
  132. try {
  133.  
  134. // Create a ConnectionFactory
  135. ActiveMQConnectionFactory* connectionFactory =
  136. new ActiveMQConnectionFactory( brokerURI );
  137.  
  138. // Create a Connection
  139. connection = connectionFactory->createConnection();
  140. delete connectionFactory;
  141.  
  142. ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
  143. if( amqConnection != NULL ) {
  144. amqConnection->addTransportListener( this );
  145. }
  146.  
  147. connection->start();
  148.  
  149. connection->setExceptionListener(this);
  150.  
  151. // Create a Session
  152. if( clientAck ) {
  153. session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
  154. } else {
  155. session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
  156. }
  157.  
  158. // Create the destination (Topic or Queue)
  159. if( useTopic ) {
  160. destination = session->createTopic( destURI );
  161. } else {
  162. destination = session->createQueue( destURI );
  163. }
  164.  
  165.  
  166. if (selector.length())
  167. {
  168. std::cout<<"Selector is " << selector<<std::endl;
  169. consumer = session->createConsumer( destination,std::string("Selector = " + selector ) );
  170. }
  171. else
  172. // Create a MessageConsumer from the Session to the Topic or Queue
  173. consumer = session->createConsumer( destination );
  174.  
  175. consumer->setMessageListener( this );
  176.  
  177. } catch (CMSException& e) {
  178.  
  179. e.printStackTrace();
  180. }
  181. }
  182.  
  183. // Called from the consumer since this class is a registered MessageListener.
  184. virtual void onMessage( const Message* message ) {
  185.  
  186. //static int count = 0;
  187. //long int msgSendTime=0;
  188. try
  189. {
  190. count++;
  191.  
  192. if (count == 1)
  193. {
  194. now.update();
  195. totalTime = now.epochMicroseconds();
  196. }
  197.  
  198. const TextMessage* textMessage =
  199. dynamic_cast< const TextMessage* >( message );
  200. string text = "";
  201.  
  202. if( textMessage != NULL ) {
  203. text = textMessage->getText();
  204. msgSendTime = message->getLongProperty("SendTime");
  205. } else {
  206. text = "NOT A TEXTMESSAGE!";
  207. }
  208.  
  209. if( clientAck ) {
  210. message->acknowledge();
  211. }
  212. now.update();
  213. printf( "Message #%10d Received: %s Per Msg Time:%10f ms\n", count, text.c_str(),((double)now.epochMicroseconds() - msgSendTime) / 1000 );
  214. if (count == 1)
  215. {
  216. firstToLastTime = msgSendTime;
  217. //printf("Max Msg No - First %ld, now micro %ld\n",firstToLastTime,now.epochMicroseconds());
  218. minMicro = (now.epochMicroseconds() - msgSendTime);
  219. maxMicro = (now.epochMicroseconds() - msgSendTime);
  220. }
  221. if (count >= numMessages)
  222. {
  223. firstToLastTime = now.epochMicroseconds() - firstToLastTime;
  224. totalTime = now.epochMicroseconds() - totalTime;
  225. //printf("Max Msg No - last %ld\n",now.epochMicroseconds());
  226. }
  227.  
  228. if ((now.epochMicroseconds() - msgSendTime) >= maxMicro)
  229. maxMsgNumber = count;
  230.  
  231. maxMicro = std::max(now.epochMicroseconds() - msgSendTime,maxMicro);
  232. minMicro = std::min(now.epochMicroseconds() - msgSendTime,minMicro);
  233. //totalTime += (now.epochMicroseconds() - msgSendTime);
  234. //perMsgMicro = now.elapsed();
  235.  
  236. } catch (CMSException& e) {
  237. e.printStackTrace();
  238. }
  239. }
  240.  
  241. // If something bad happens you see it here as this class is also been
  242. // registered as an ExceptionListener with the connection.
  243. virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
  244. printf("CMS Exception occurred. Shutting down client.\n");
  245. exit(1);
  246. }
  247.  
  248. virtual void transportInterrupted() {
  249. std::cout << "The Connection's Transport has been Interrupted." << std::endl;
  250. }
  251.  
  252. virtual void transportResumed() {
  253. std::cout << "The Connection's Transport has been Restored." << std::endl;
  254. }
  255.  
  256. private:
  257.  
  258. void cleanup(){
  259.  
  260. try {
  261. if( connection != NULL ) {
  262. connection->close();
  263. }
  264. } catch ( CMSException& e ) {
  265. e.printStackTrace();
  266. }
  267.  
  268. delete destination;
  269. delete consumer;
  270. delete session;
  271. delete connection;
  272. }
  273. };
  274.  
  275.  
  276. ////////////////////////////////////////////////////////////////////////////////
  277. int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
  278.  
  279. activemq::library::ActiveMQCPP::initializeLibrary();
  280.  
  281. std::cout << "=====================================================\n";
  282. std::cout << "Starting the example:" << std::endl;
  283. if (argv[1] != NULL) std::cout << "Destination - "<< argv[1] <<"\n";
  284. if (argv[2] != NULL) std::cout << "Use Topic/Queue - "<< argv[2] <<"\n";
  285. if (argv[3] != NULL) std::cout << "Number of Messages - "<< argv[3] <<"\n";
  286. if (argv[4] != NULL) std::cout << "Selector Value - "<< argv[4] <<"\n";
  287. std::cout << "-----------------------------------------------------\n";
  288.  
  289.  
  290.  
  291. // Set the URI to point to the IPAddress of your broker.
  292. // add any optional params to the url to enable things like
  293. // tightMarshalling or tcp logging etc. See the CMS web site for
  294. // a full list of configuration options.
  295. //
  296. // http://a...content-available-to-author-only...e.org/cms/
  297. //
  298. std::string brokerURI =
  299. "failover:(tcp://192.168.180.49:61616)";
  300. //"failover://(tcp://192.168.180.49:61616?socketBufferSize=150000&ioBufferSize=150000&wireFormat.tightEncodingEnabled=true&jms.prefetchPolicy.all=100000)";
  301. //"failover://(tcp://192.168.180.49:61616?socketBufferSize=150000&ioBufferSize=150000&wireFormat.tightEncodingEnabled=true&jms.prefetchPolicy.all=100000&inputBufferSize=15000&outputBufferSize=15000)";
  302. //"failover:(tcp://192.168.180.49:61616?wireFormat.tightEncodingEnabled=true&wireFormat.tcpNoDelayEnabled=true&jms.useAsyncSend=true&jms.prefetchPolicy.all=100000&useAsyncSend=true)";
  303. //"failover:(tcp://localhost:61616)";
  304.  
  305. //============================================================
  306. // This is the Destination Name and URI options. Use this to
  307. // customize where the consumer listens, to have the consumer
  308. // use a topic or queue set the 'useTopics' flag.
  309. //============================================================
  310. std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
  311.  
  312. //============================================================
  313. // set to true to use topics instead of queues
  314. // Note in the code above that this causes createTopic or
  315. // createQueue to be used in the consumer.
  316. //============================================================
  317. bool useTopics = false;
  318.  
  319. //============================================================
  320. // set to true if you want the consumer to use client ack mode
  321. // instead of the default auto ack mode.
  322. //============================================================
  323. bool clientAck = false;
  324.  
  325. int numMessages = 10;
  326. std::string selector("");
  327.  
  328. if ((argv[1] != NULL))
  329. destURI = argv[1];
  330.  
  331. if ((argv[2] != NULL) && (argv[2] == std::string("Topic")))
  332. useTopics = true;
  333.  
  334. if (argv[3] != NULL)
  335. numMessages = atoi(argv[3]);
  336.  
  337. if (argv[4] != NULL)
  338. selector = argv[4];
  339.  
  340. std::cout << "=====================================================\n";
  341. std::cout << "XStarting the example:" << std::endl;
  342. std::cout << "useTopics - "<< useTopics <<std::endl;
  343. std::cout << "Destination - "<< destURI <<std::endl;
  344. std::cout << "Number of Messages - "<< numMessages <<"\n";
  345. std::cout << "Selector - "<< selector <<"\n";
  346. std::cout << "=====================================================\n";
  347.  
  348. Poco::Timestamp now;
  349. // Create the consumer
  350. SimpleAsyncConsumer consumer( brokerURI, destURI, selector, now, useTopics, clientAck, numMessages );
  351.  
  352. // Start it up and it will listen forever.
  353. consumer.runConsumer();
  354. /****
  355.   // Wait to exit.
  356.   //std::cout << "Press 'q' to quit" << std::endl;
  357.   while( std::cin.get() != 'q') {}
  358. ****/
  359. while(!std::cin.get() ) {}
  360.  
  361. // All CMS resources should be closed before the library is shutdown.
  362. consumer.close();
  363.  
  364. std::cout << "-----------------------------------------------------\n";
  365. std::cout << "Finished with the example." << std::endl;
  366. std::cout << "=====================================================\n";
  367.  
  368. activemq::library::ActiveMQCPP::shutdownLibrary();
  369. }
Compilation error #stdin compilation error #stdout 0s 0KB
stdin
Standard input is empty
compilation info
prog.cpp:19:28: fatal error: Poco/Timestamp.h: No such file or directory
compilation terminated.
stdout
Standard output is empty