Skip to content

Commit df1784e

Browse files
authored
Have a fully JMS 1.0 compatible instrumentation (#11413)
Have a fully JMS 1.0 compatible instrumentation move stubs to test fixture Co-authored-by: andrea.marziali <andrea.marziali@datadoghq.com>
1 parent afdf801 commit df1784e

12 files changed

Lines changed: 850 additions & 15 deletions

File tree

dd-java-agent/instrumentation/jms/javax-jms-1.1/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ muzzle {
1414
}
1515

1616
apply from: "$rootDir/gradle/java.gradle"
17+
apply plugin: 'java-test-fixtures'
1718

1819
repositories {
1920
maven {
@@ -33,6 +34,8 @@ tasks.named("latestDepTest", Test) {
3334
dependencies {
3435
compileOnly group: 'javax.jms', name: 'jms-api', version: '1.1-rev-1'
3536

37+
testFixturesCompileOnly group: 'javax.jms', name: 'jms-api', version: '1.1-rev-1'
38+
3639
testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation')
3740
testImplementation group: 'org.apache.activemq.tooling', name: 'activemq-junit', version: '5.14.5'
3841
testImplementation group: 'org.apache.activemq', name: 'activemq-pool', version: '5.14.5'

dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
import javax.jms.Destination;
2020
import javax.jms.JMSException;
2121
import javax.jms.Message;
22+
import javax.jms.MessageProducer;
2223
import javax.jms.Queue;
24+
import javax.jms.QueueSender;
2325
import javax.jms.TemporaryQueue;
2426
import javax.jms.TemporaryTopic;
2527
import javax.jms.Topic;
28+
import javax.jms.TopicPublisher;
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
2831

@@ -265,6 +268,18 @@ public CharSequence toResourceName(String destinationName, boolean isQueue) {
265268
return joiner.apply(destinationName);
266269
}
267270

271+
public Destination getDestination(final MessageProducer messageProducer) throws JMSException {
272+
try {
273+
return messageProducer.getDestination(); // >= 1.1
274+
} catch (AbstractMethodError ignored) {
275+
// <=1.1 getDestination is not available so we need to pay an additional instanceOf
276+
if (messageProducer instanceof QueueSender) {
277+
return ((QueueSender) messageProducer).getQueue();
278+
}
279+
return ((TopicPublisher) messageProducer).getTopic();
280+
}
281+
}
282+
268283
public String getDestinationName(Destination destination) {
269284
String name = null;
270285
try {

dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSMessageProducerInstrumentation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ public static AgentScope beforeSend(
9090
// fall-back when producer wasn't created via standard Session.createProducer API
9191
if (null != producerState) {
9292
resourceName = producerState.getResourceName();
93-
Destination destination = producer.getDestination();
93+
Destination destination = PRODUCER_DECORATE.getDestination(producer);
9494
destinationName = PRODUCER_DECORATE.getDestinationName(destination);
9595
} else {
96-
Destination destination = producer.getDestination();
96+
Destination destination = PRODUCER_DECORATE.getDestination(producer);
9797
destinationName = PRODUCER_DECORATE.getDestinationName(destination);
9898
boolean isQueue = PRODUCER_DECORATE.isQueue(destination);
9999
resourceName = PRODUCER_DECORATE.toResourceName(destinationName, isQueue);

dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/SessionInstrumentation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public static void bindProducerState(
114114
int ackMode;
115115
try {
116116
ackMode = session.getAcknowledgeMode();
117-
} catch (Exception ignored) {
117+
} catch (Throwable ignored) {
118118
ackMode = Session.AUTO_ACKNOWLEDGE;
119119
}
120120
sessionState =
@@ -155,7 +155,7 @@ public static void bindConsumerState(
155155
int ackMode;
156156
try {
157157
ackMode = session.getAcknowledgeMode();
158-
} catch (Exception ignored) {
158+
} catch (Throwable ignored) {
159159
ackMode = Session.AUTO_ACKNOWLEDGE;
160160
}
161161
sessionState =

dd-java-agent/instrumentation/jms/javax-jms-1.1/src/test/groovy/JMS1Test.groovy

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,32 @@ import datadog.trace.agent.test.naming.VersionedNamingTestBase
77
import datadog.trace.api.Config
88
import datadog.trace.api.DDSpanTypes
99
import datadog.trace.api.Trace
10-
import datadog.trace.api.config.TracerConfig
1110
import datadog.trace.api.config.TraceInstrumentationConfig
11+
import datadog.trace.api.config.TracerConfig
1212
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
1313
import datadog.trace.bootstrap.instrumentation.api.Tags
1414
import datadog.trace.core.DDSpan
15-
import org.apache.activemq.ActiveMQConnectionFactory
16-
import org.apache.activemq.command.ActiveMQTextMessage
17-
import org.apache.activemq.junit.EmbeddedActiveMQBroker
18-
import spock.lang.Shared
19-
15+
import java.util.concurrent.CountDownLatch
16+
import java.util.concurrent.atomic.AtomicReference
2017
import javax.jms.Connection
18+
import javax.jms.ConnectionFactory
2119
import javax.jms.Destination
2220
import javax.jms.Message
2321
import javax.jms.MessageListener
22+
import javax.jms.Queue
2423
import javax.jms.QueueConnection
2524
import javax.jms.QueueSession
2625
import javax.jms.Session
2726
import javax.jms.TemporaryQueue
2827
import javax.jms.TemporaryTopic
29-
import javax.jms.Queue
30-
import javax.jms.Topic
3128
import javax.jms.TextMessage
29+
import javax.jms.Topic
3230
import javax.jms.TopicConnection
3331
import javax.jms.TopicSession
34-
import java.util.concurrent.CountDownLatch
35-
import java.util.concurrent.atomic.AtomicReference
32+
import jms10mock.Jms10ConnectionFactory
33+
import org.apache.activemq.command.ActiveMQTextMessage
34+
import org.apache.activemq.junit.EmbeddedActiveMQBroker
35+
import spock.lang.Shared
3636

3737
abstract class JMS1Test extends VersionedNamingTestBase {
3838
@Shared
@@ -69,9 +69,13 @@ abstract class JMS1Test extends VersionedNamingTestBase {
6969
true
7070
}
7171

72+
def createConnectionFactory() {
73+
broker.createConnectionFactory()
74+
}
75+
7276
def setupSpec() {
7377
broker.start()
74-
final ActiveMQConnectionFactory connectionFactory = broker.createConnectionFactory()
78+
final ConnectionFactory connectionFactory = createConnectionFactory()
7579

7680
connection = connectionFactory.createConnection()
7781
connection.start()
@@ -1097,3 +1101,10 @@ class JMS1V1ForkedTest extends JMS1Test {
10971101
"jms.process"
10981102
}
10991103
}
1104+
1105+
class JMS10Test extends JMS1V0Test {
1106+
@Override
1107+
def createConnectionFactory() {
1108+
new Jms10ConnectionFactory(super.createConnectionFactory())
1109+
}
1110+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package jms10mock;
2+
3+
import javax.jms.Connection;
4+
import javax.jms.ConnectionConsumer;
5+
import javax.jms.ConnectionMetaData;
6+
import javax.jms.Destination;
7+
import javax.jms.ExceptionListener;
8+
import javax.jms.JMSException;
9+
import javax.jms.Queue;
10+
import javax.jms.QueueConnection;
11+
import javax.jms.QueueSession;
12+
import javax.jms.ServerSessionPool;
13+
import javax.jms.Session;
14+
import javax.jms.Topic;
15+
import javax.jms.TopicConnection;
16+
import javax.jms.TopicSession;
17+
18+
/** Wraps a real {@link Connection} but simulates a JMS 1.0 provider. */
19+
public class Jms10Connection implements QueueConnection, TopicConnection {
20+
private final Connection delegate;
21+
22+
public Jms10Connection(Connection delegate) {
23+
this.delegate = delegate;
24+
}
25+
26+
// --- JMS 1.1-only unified Connection method ---
27+
28+
@Override
29+
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
30+
throw new AbstractMethodError(
31+
"JMS 1.0 provider does not implement createSession(boolean, int) on Connection");
32+
}
33+
34+
// --- JMS 1.0 QueueConnection methods ---
35+
36+
@Override
37+
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
38+
throws JMSException {
39+
return new Jms10Session(delegate.createSession(transacted, acknowledgeMode));
40+
}
41+
42+
// --- JMS 1.0 TopicConnection methods ---
43+
44+
@Override
45+
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode)
46+
throws JMSException {
47+
return new Jms10Session(delegate.createSession(transacted, acknowledgeMode));
48+
}
49+
50+
// --- Common Connection methods ---
51+
52+
@Override
53+
public String getClientID() throws JMSException {
54+
return delegate.getClientID();
55+
}
56+
57+
@Override
58+
public void setClientID(String clientID) throws JMSException {
59+
delegate.setClientID(clientID);
60+
}
61+
62+
@Override
63+
public ConnectionMetaData getMetaData() throws JMSException {
64+
return delegate.getMetaData();
65+
}
66+
67+
@Override
68+
public ExceptionListener getExceptionListener() throws JMSException {
69+
return delegate.getExceptionListener();
70+
}
71+
72+
@Override
73+
public void setExceptionListener(ExceptionListener listener) throws JMSException {
74+
delegate.setExceptionListener(listener);
75+
}
76+
77+
@Override
78+
public void start() throws JMSException {
79+
delegate.start();
80+
}
81+
82+
@Override
83+
public void stop() throws JMSException {
84+
delegate.stop();
85+
}
86+
87+
@Override
88+
public void close() throws JMSException {
89+
delegate.close();
90+
}
91+
92+
// --- ConnectionConsumer methods — not commonly used, throw for JMS 1.1 unified form ---
93+
94+
@Override
95+
public ConnectionConsumer createConnectionConsumer(
96+
Destination destination,
97+
String messageSelector,
98+
ServerSessionPool sessionPool,
99+
int maxMessages)
100+
throws JMSException {
101+
throw new AbstractMethodError(
102+
"JMS 1.0 provider does not implement createConnectionConsumer(Destination, ...)");
103+
}
104+
105+
@Override
106+
public ConnectionConsumer createConnectionConsumer(
107+
Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
108+
throws JMSException {
109+
return delegate.createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages);
110+
}
111+
112+
@Override
113+
public ConnectionConsumer createConnectionConsumer(
114+
Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
115+
throws JMSException {
116+
return delegate.createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages);
117+
}
118+
119+
@Override
120+
public ConnectionConsumer createDurableConnectionConsumer(
121+
Topic topic,
122+
String subscriptionName,
123+
String messageSelector,
124+
ServerSessionPool sessionPool,
125+
int maxMessages)
126+
throws JMSException {
127+
return delegate.createDurableConnectionConsumer(
128+
topic, subscriptionName, messageSelector, sessionPool, maxMessages);
129+
}
130+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package jms10mock;
2+
3+
import javax.jms.Connection;
4+
import javax.jms.ConnectionFactory;
5+
import javax.jms.JMSException;
6+
import javax.jms.QueueConnection;
7+
import javax.jms.QueueConnectionFactory;
8+
import javax.jms.TopicConnection;
9+
import javax.jms.TopicConnectionFactory;
10+
11+
/**
12+
* Wraps a real {@link ConnectionFactory} but simulates a JMS 1.0 provider.
13+
*
14+
* <p>In JMS 1.0, clients used the domain-specific {@link QueueConnectionFactory} and {@link
15+
* TopicConnectionFactory} to obtain connections. The unified {@link ConnectionFactory} and its
16+
* {@code createConnection()} methods are JMS 1.1 additions that this wrapper does not support.
17+
*/
18+
public class Jms10ConnectionFactory implements QueueConnectionFactory, TopicConnectionFactory {
19+
private final ConnectionFactory delegate;
20+
21+
public Jms10ConnectionFactory(ConnectionFactory delegate) {
22+
this.delegate = delegate;
23+
}
24+
25+
// --- JMS 1.1-only unified ConnectionFactory methods ---
26+
27+
@Override
28+
public Connection createConnection() throws JMSException {
29+
return delegate.createConnection();
30+
}
31+
32+
@Override
33+
public Connection createConnection(String userName, String password) throws JMSException {
34+
return delegate.createConnection(userName, password);
35+
}
36+
37+
// --- JMS 1.0 QueueConnectionFactory methods ---
38+
@Override
39+
public QueueConnection createQueueConnection() throws JMSException {
40+
return new Jms10Connection(delegate.createConnection());
41+
}
42+
43+
@Override
44+
public QueueConnection createQueueConnection(String userName, String password)
45+
throws JMSException {
46+
return new Jms10Connection(delegate.createConnection(userName, password));
47+
}
48+
49+
// --- JMS 1.0 TopicConnectionFactory methods ---
50+
51+
@Override
52+
public TopicConnection createTopicConnection() throws JMSException {
53+
return new Jms10Connection(delegate.createConnection());
54+
}
55+
56+
@Override
57+
public TopicConnection createTopicConnection(String userName, String password)
58+
throws JMSException {
59+
return new Jms10Connection(delegate.createConnection(userName, password));
60+
}
61+
}

0 commit comments

Comments
 (0)