View Javadoc
1 /*** 2 * Redistribution and use of this software and associated documentation 3 * ("Software"), with or without modification, are permitted provided 4 * that the following conditions are met: 5 * 6 * 1. Redistributions of source code must retain copyright 7 * statements and notices. Redistributions must also contain a 8 * copy of this document. 9 * 10 * 2. Redistributions in binary form must reproduce the 11 * above copyright notice, this list of conditions and the 12 * following disclaimer in the documentation and/or other 13 * materials provided with the distribution. 14 * 15 * 3. The name "Exolab" must not be used to endorse or promote 16 * products derived from this Software without prior written 17 * permission of Exoffice Technologies. For written permission, 18 * please contact tma@netspace.net.au. 19 * 20 * 4. Products derived from this Software may not be called "Exolab" 21 * nor may "Exolab" appear in their names without prior written 22 * permission of Exoffice Technologies. Exolab is a registered 23 * trademark of Exoffice Technologies. 24 * 25 * 5. Due credit should be given to the Exolab Project 26 * (http://www.exolab.org/). 27 * 28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS 29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT 30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED 39 * OF THE POSSIBILITY OF SUCH DAMAGE. 40 * 41 * Copyright 2004 (C) Exoffice Technologies Inc. All Rights Reserved. 42 * 43 * $Id: BasicServerSessionPool.java,v 1.2 2004/02/02 03:49:55 tanderson Exp $ 44 */ 45 package org.exolab.jmscts.jms.asf; 46 47 import java.util.LinkedList; 48 49 import javax.jms.Connection; 50 import javax.jms.JMSException; 51 import javax.jms.Message; 52 import javax.jms.MessageListener; 53 import javax.jms.QueueConnection; 54 import javax.jms.ServerSession; 55 import javax.jms.ServerSessionPool; 56 import javax.jms.Session; 57 import javax.jms.TopicConnection; 58 import javax.jms.XAQueueConnection; 59 import javax.jms.XAQueueSession; 60 import javax.jms.XATopicConnection; 61 import javax.jms.XATopicSession; 62 63 import org.apache.log4j.Category; 64 65 66 /*** 67 * Implementation of the <code>javax.jms.ServerSessionPool</code> interface. 68 * 69 * @version $Revision: 1.2 $ $Date: 2004/02/02 03:49:55 $ 70 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a> 71 */ 72 public class BasicServerSessionPool implements ServerSessionPool { 73 74 /*** 75 * The maximum no. of sessions in the pool 76 */ 77 private final int _size; 78 79 /*** 80 * The connection to allocate sessions from 81 */ 82 private Connection _connection; 83 84 /*** 85 * If <code>true</code>, allocate transacted sessions 86 */ 87 private boolean _transacted; 88 89 /*** 90 * The message acknowledgement mode, for non-transacted sessions 91 */ 92 private int _ackMode; 93 94 /*** 95 * The listener to delegate messages to 96 */ 97 private MessageListener _listener; 98 99 /*** 100 * The pool of server sessions 101 */ 102 private LinkedList _pool = new LinkedList(); 103 104 /*** 105 * True if the pool is closed 106 */ 107 private boolean _closed = false; 108 109 /*** 110 * The logger 111 */ 112 private static final Category log = 113 Category.getInstance(BasicServerSessionPool.class); 114 115 116 /*** 117 * Construct a new <code>BasicServerSessionPool</code> 118 * 119 * @param size the maximum no. of sessions to allocate 120 * @param connection the connection to allocate sessions from 121 * @param transacted if <code>true</code>, allocate transacted sessions 122 * @param ackMode the message acknowledgement mode, for non-transacted 123 * sessions 124 * @param listener the listener to delegate messages to 125 * @throws JMSException if sessions cannot be allocated 126 */ 127 public BasicServerSessionPool(int size, Connection connection, 128 boolean transacted, int ackMode, 129 MessageListener listener) 130 throws JMSException { 131 132 if (size < 1) { 133 throw new IllegalArgumentException("Argument 'size' must be > 0"); 134 } 135 if (connection == null) { 136 throw new IllegalArgumentException( 137 "Argument 'connection' is null"); 138 } 139 if (listener == null) { 140 throw new IllegalArgumentException("Argument 'listener' is null"); 141 } 142 _size = size; 143 _connection = connection; 144 _transacted = transacted; 145 _ackMode = ackMode; 146 _listener = new SingleThreadedListener(listener); 147 148 // pre-allocate server sessions 149 for (int i = 0; i < _size; ++i) { 150 _pool.add(create()); 151 } 152 } 153 154 /*** 155 * Return a server session from the pool 156 * 157 * @return a server session from the pool 158 * @throws JMSException if an application server fails to return a server 159 * session from its pool 160 */ 161 public ServerSession getServerSession() throws JMSException { 162 ServerSession session = null; 163 synchronized (_pool) { 164 while (session == null) { 165 if (_closed) { 166 throw new JMSException("Pool has been closed"); 167 } 168 if (!_pool.isEmpty()) { 169 session = (ServerSession) _pool.removeFirst(); 170 } else { 171 try { 172 _pool.wait(); 173 } catch (InterruptedException ignore) { 174 // no-op 175 } 176 } 177 } 178 } 179 return session; 180 } 181 182 /*** 183 * Close the pool, destroying any allocated sessions 184 * 185 * @throws JMSException if the underlying session's can't be closed 186 */ 187 public void close() throws JMSException { 188 synchronized (_pool) { 189 _closed = true; 190 while (!_pool.isEmpty()) { 191 BasicServerSession session = 192 (BasicServerSession) _pool.removeFirst(); 193 session.close(); 194 } 195 196 _pool.notify(); 197 } 198 } 199 200 /*** 201 * Release a server session back to the pool 202 * 203 * @param session the session to release 204 */ 205 protected void release(BasicServerSession session) { 206 synchronized (_pool) { 207 if (_closed) { 208 try { 209 session.close(); 210 } catch (JMSException exception) { 211 log.error(exception); 212 } 213 } else { 214 _pool.add(session); 215 _pool.notify(); 216 } 217 } 218 } 219 220 /*** 221 * Create a new server session 222 * 223 * @return the new server session 224 * @throws JMSException if the session cannot be created 225 */ 226 protected ServerSession create() throws JMSException { 227 Session session = null; 228 if (_connection instanceof XAQueueConnection) { 229 XAQueueSession xaSession = 230 ((XAQueueConnection) _connection).createXAQueueSession(); 231 session = xaSession.getQueueSession(); 232 } else if (_connection instanceof XATopicConnection) { 233 XATopicSession xaSession = 234 ((XATopicConnection) _connection).createXATopicSession(); 235 session = xaSession.getTopicSession(); 236 } else if (_connection instanceof QueueConnection) { 237 session = ((QueueConnection) _connection).createQueueSession( 238 _transacted, _ackMode); 239 } else if (_connection instanceof TopicConnection) { 240 session = ((TopicConnection) _connection).createTopicSession( 241 _transacted, _ackMode); 242 } else { 243 throw new JMSException("Invalid connection: " + _connection); 244 } 245 session.setMessageListener(_listener); 246 247 return new BasicServerSession(this, session); 248 } 249 250 /*** 251 * Helper class to ensure that a <code>MessageListener</code> is only 252 * invoked by a single thread at a time 253 */ 254 static class SingleThreadedListener implements MessageListener { 255 256 /*** 257 * The listener to single thread 258 */ 259 private MessageListener _listener; 260 261 /*** 262 * Construct a new <code>SingleThreadedListener</code> 263 * 264 * @param listener the listener to single thread 265 */ 266 public SingleThreadedListener(MessageListener listener) { 267 _listener = listener; 268 } 269 270 /*** 271 * Handle a message, delegating it to the listener 272 * 273 * @param message the message 274 */ 275 public void onMessage(Message message) { 276 synchronized (_listener) { 277 _listener.onMessage(message); 278 } 279 } 280 } 281 282 }

This page was automatically generated by Maven