1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact tma@netspace.net.au.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: BasicServerSessionPool.java,v 1.2 2004/02/02 03:49:55 tanderson Exp $
44 */
45 package org.exolab.jmscts.jms.asf;
46
47 import java.util.LinkedList;
48
49 import javax.jms.Connection;
50 import javax.jms.JMSException;
51 import javax.jms.Message;
52 import javax.jms.MessageListener;
53 import javax.jms.QueueConnection;
54 import javax.jms.ServerSession;
55 import javax.jms.ServerSessionPool;
56 import javax.jms.Session;
57 import javax.jms.TopicConnection;
58 import javax.jms.XAQueueConnection;
59 import javax.jms.XAQueueSession;
60 import javax.jms.XATopicConnection;
61 import javax.jms.XATopicSession;
62
63 import org.apache.log4j.Category;
64
65
66 /***
67 * Implementation of the <code>javax.jms.ServerSessionPool</code> interface.
68 *
69 * @version $Revision: 1.2 $ $Date: 2004/02/02 03:49:55 $
70 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
71 */
72 public class BasicServerSessionPool implements ServerSessionPool {
73
74 /***
75 * The maximum no. of sessions in the pool
76 */
77 private final int _size;
78
79 /***
80 * The connection to allocate sessions from
81 */
82 private Connection _connection;
83
84 /***
85 * If <code>true</code>, allocate transacted sessions
86 */
87 private boolean _transacted;
88
89 /***
90 * The message acknowledgement mode, for non-transacted sessions
91 */
92 private int _ackMode;
93
94 /***
95 * The listener to delegate messages to
96 */
97 private MessageListener _listener;
98
99 /***
100 * The pool of server sessions
101 */
102 private LinkedList _pool = new LinkedList();
103
104 /***
105 * True if the pool is closed
106 */
107 private boolean _closed = false;
108
109 /***
110 * The logger
111 */
112 private static final Category log =
113 Category.getInstance(BasicServerSessionPool.class);
114
115
116 /***
117 * Construct a new <code>BasicServerSessionPool</code>
118 *
119 * @param size the maximum no. of sessions to allocate
120 * @param connection the connection to allocate sessions from
121 * @param transacted if <code>true</code>, allocate transacted sessions
122 * @param ackMode the message acknowledgement mode, for non-transacted
123 * sessions
124 * @param listener the listener to delegate messages to
125 * @throws JMSException if sessions cannot be allocated
126 */
127 public BasicServerSessionPool(int size, Connection connection,
128 boolean transacted, int ackMode,
129 MessageListener listener)
130 throws JMSException {
131
132 if (size < 1) {
133 throw new IllegalArgumentException("Argument 'size' must be > 0");
134 }
135 if (connection == null) {
136 throw new IllegalArgumentException(
137 "Argument 'connection' is null");
138 }
139 if (listener == null) {
140 throw new IllegalArgumentException("Argument 'listener' is null");
141 }
142 _size = size;
143 _connection = connection;
144 _transacted = transacted;
145 _ackMode = ackMode;
146 _listener = new SingleThreadedListener(listener);
147
148 // pre-allocate server sessions
149 for (int i = 0; i < _size; ++i) {
150 _pool.add(create());
151 }
152 }
153
154 /***
155 * Return a server session from the pool
156 *
157 * @return a server session from the pool
158 * @throws JMSException if an application server fails to return a server
159 * session from its pool
160 */
161 public ServerSession getServerSession() throws JMSException {
162 ServerSession session = null;
163 synchronized (_pool) {
164 while (session == null) {
165 if (_closed) {
166 throw new JMSException("Pool has been closed");
167 }
168 if (!_pool.isEmpty()) {
169 session = (ServerSession) _pool.removeFirst();
170 } else {
171 try {
172 _pool.wait();
173 } catch (InterruptedException ignore) {
174 // no-op
175 }
176 }
177 }
178 }
179 return session;
180 }
181
182 /***
183 * Close the pool, destroying any allocated sessions
184 *
185 * @throws JMSException if the underlying session's can't be closed
186 */
187 public void close() throws JMSException {
188 synchronized (_pool) {
189 _closed = true;
190 while (!_pool.isEmpty()) {
191 BasicServerSession session =
192 (BasicServerSession) _pool.removeFirst();
193 session.close();
194 }
195
196 _pool.notify();
197 }
198 }
199
200 /***
201 * Release a server session back to the pool
202 *
203 * @param session the session to release
204 */
205 protected void release(BasicServerSession session) {
206 synchronized (_pool) {
207 if (_closed) {
208 try {
209 session.close();
210 } catch (JMSException exception) {
211 log.error(exception);
212 }
213 } else {
214 _pool.add(session);
215 _pool.notify();
216 }
217 }
218 }
219
220 /***
221 * Create a new server session
222 *
223 * @return the new server session
224 * @throws JMSException if the session cannot be created
225 */
226 protected ServerSession create() throws JMSException {
227 Session session = null;
228 if (_connection instanceof XAQueueConnection) {
229 XAQueueSession xaSession =
230 ((XAQueueConnection) _connection).createXAQueueSession();
231 session = xaSession.getQueueSession();
232 } else if (_connection instanceof XATopicConnection) {
233 XATopicSession xaSession =
234 ((XATopicConnection) _connection).createXATopicSession();
235 session = xaSession.getTopicSession();
236 } else if (_connection instanceof QueueConnection) {
237 session = ((QueueConnection) _connection).createQueueSession(
238 _transacted, _ackMode);
239 } else if (_connection instanceof TopicConnection) {
240 session = ((TopicConnection) _connection).createTopicSession(
241 _transacted, _ackMode);
242 } else {
243 throw new JMSException("Invalid connection: " + _connection);
244 }
245 session.setMessageListener(_listener);
246
247 return new BasicServerSession(this, session);
248 }
249
250 /***
251 * Helper class to ensure that a <code>MessageListener</code> is only
252 * invoked by a single thread at a time
253 */
254 static class SingleThreadedListener implements MessageListener {
255
256 /***
257 * The listener to single thread
258 */
259 private MessageListener _listener;
260
261 /***
262 * Construct a new <code>SingleThreadedListener</code>
263 *
264 * @param listener the listener to single thread
265 */
266 public SingleThreadedListener(MessageListener listener) {
267 _listener = listener;
268 }
269
270 /***
271 * Handle a message, delegating it to the listener
272 *
273 * @param message the message
274 */
275 public void onMessage(Message message) {
276 synchronized (_listener) {
277 _listener.onMessage(message);
278 }
279 }
280 }
281
282 }
This page was automatically generated by Maven