ServiceTechMag.com > Archive > Issue LXIII, June 2012 > Transports in Apache CXF
Andrei Shakirin

Andrei Shakirin

Biography

Andrei Shakirin is a Solution Architect at Talend/SOPERA and has been working for the last 8 years in developing an open source ESB platform. He has long-term experience in the design and implementation of custom application integration projects using different technologies. He is an active user of the Eclipse SOA Tool Platform and contributor to Eclipse Swordfish and Apache CXF projects. He is member of OASIS S-RAMP Working Group.

Contributions

rss  subscribe to this author

Bookmarks



Transports in Apache CXF

Published: June 26th, 2012 • Service Technology Magazine Issue LXIII PDF
 

Abstract: Apache CXF is a Web services and REST framework designed in a very extensible and flexible way. One very important aspect of the CXF framework is the ability of transports. Transports are responsible for physical communication between clients and services. This article, broken into two parts, describes how transports are organized in CXF. The first part gives a general overview of the architecture and design of the CXF transport layer; it also describes how to create custom transports and when they can/should be used. The second part concentrates on JMS transport and shows how to design scalable CXF applications using JMS.


Introduction

Presently the CXF distribution provides transport implementations for the following protocols: HTTP(S), JMS, JBI, and Local [REF-1]. HTTP(S) and JMS transports support corresponding protocols and interfaces. JBI transport provides communication with JBI service engines and binding components. Local transport is designed for optimized communication between participants in the same JVM. Apache Camel projects additionally provide Camel transport for CXF [REF-2].

Normally, the creation of a new custom transport is required for a protocol that is not yet supported by CXF - UDP or FTP, for example. Of course, in this case a custom transport can possibly be implemented using a Camel-based solution, but if Camel is not appropriate for some reason, a CXF custom transport is a valid alternative. New CXF transports can also be a solution for legacy ESB participants that have to be implemented using a standard JAX-WS interface, but these should communicate using high-level protocol-based transports on an old ESB (JBI transport is the example of such use case). To understand this better, an analysis is given of the CXF transport layer in more detail next.


CXF Transport Layer


Architecture and Design

The transport functionality is based on two fundamental definitions: conduit and destination. Conduits are responsible for sending a message to recipients and destinations for receiving a message from the sender. In order to send a response, a destination needs its own back-channel conduit (in case of request-response communication). Conduits and destinations are created by a TransportFactory. CXF selects the correct TransportFactory based on the transport URL. SOAP is also considered a high-level transport and has its own conduit and destination in CXF.

To send a message into a physical channel, the conduit should access the message content. As far as CXF is streaming-oriented, normal practice in this case is to use a subclass of OutputStream extending CachedOutputStream. The custom stream will be fed the message and will provide a possibility to access context in streaming or buffered form depending on the transport requirements. CachedOutputStream is configured to keep a message in memory only up to a predefined size. If this size is exceeded, the message is swapped to a disk.

A class diagram of TransportFactory, Conduit, Destination and OutputStream is shown below:


How it Works

Interaction between a JAX-WS client and a service using CXF transport is represented in the following figure:

What happens in the transport layer on the client side and on the service side by sending/receiving message? This will be looked at next in detail.


Client Workflow

  • Step 1: JAX-WS client invokes a service, in this manner for example:

URL wsdlURL = his.getClass().getResource("/HelloWorld.wsdl");
HelloWorldService service = new HelloWorldService(wsdlURL, SERVICE_NAME);
HelloWorld hw = service.getHelloWorldPort();
String result = hw.sayHi(TEST_REQUEST);

  • Step 2: CXF runtime selects the correct TransportFactory based on some criteria (described below).
  • Step 3: CXF runtime calls the TransportFactory.getConduit() method to obtain the conduit.
  • Step 4: CXF runtime invokes Conduit.prepare() and passes outgoing message as an argument.
  • Step 5: Conduit sets up its own OutputStream (normally extended CachedOutputStream) as outgoing message content.
  • Step 6: CXF runtime processes the outgoing message, calls the interceptor chain and writes an outgoing message to conduit's OutputStream stream. Messaging in CXF is stream-oriented; therefore the message normally is to proceed and is not sent as one bunch, but as a stream. The last bytes of the message can still be in processing while the first one has already been sent to the recipient. It is the responsibility of Conduit as to how to send the message: using streaming or collecting the whole message and sending it at once.
  • Step 7: When CXF runtime has completely sent all of the outgoing message, it invokes Conduit.close(Message) method. This means that the message is completely written into OutputStream. Correspondingly, OutputStream.doClose() method will be called.
  • Step 8: In the doClose() method, the OutputStream class has access to the whole marshalled outgoing message and exchange and will send this message to the service using the corresponding transport protocol. In case of streaming, the parts of the message received can be already sent to the network at this time, and Conduit will send the last part and finish the request sending at a later point.
  • Step 9: If using one-way communication exchange, skip to Step 14.
  • Step 10: If using request-response communication, the conduit will wait for the service response in a synchronous or an asynchronous manner.
  • Step 11: When a response is received, the conduit creates a new message, sets its context and puts it as In-Message in the exchange as an incoming message. Content of the new message is also available as a stream. Therefore runtime and business logic can start message processing even if it is still not completely received.
  • Step 12: When fault is received, Conduit also creates a new Message, sets its context and puts it in exchange as an in-fault message.
  • Step 13: Conduit notifies incomingObserver (that is ClientImpl object) about the response using the incomingObserver.onMessage() call.
  • Step 14: Conduit.close(Message) method is invoked for the incoming message. Normally the conduit implementation decreases the reference count of current network connections, potentially closing it if the count is zero.
  • Step 15: JAX-WS client code receives the response in a synchronous or an asynchronous style.

Service Workflow

  • Step 1: JAX-WS client invokes a service, in this manner for example:

HelloWorldImpl serverImpl = new HelloWorldImpl();
Endpoint.publish("udp://localhost:9000/hello", serverImpl);

  • Step 2: CXF runtime selects the correct TransportFactory based on some criteria (described below).
  • Step 3: CXF runtime calls the TransportFactory.getConduit() method to obtain the conduit.
  • Step 4: CXF runtime calls the TransportFactory.getDestination() method to obtain the destination.
  • Step 5: Implementation of the Destination.activate() normally opens network connections and listens to incoming requests.
  • Step 6: When a request comes, the destination creates a message, sets the content and notifies the message observer (that is ChainInitializationObserver object) via incomingObserver.onMessage() about the request. Message content is saved as a stream; therefore runtime and business logic can start processing before messages are completely received. Normally an incoming connection is saved in a correlation map to be extracted for the sending of an appropriate response.
  • Step 7: The business service implementation will be called with the request message in stream form. In the case of one-way communication, the exchange is now finished. In the case of request-response, the business implementation either returns a response or throws a fault exception.
  • Step 8: The CXF runtime requests a back-channel conduit from the destination via Destination.getInbuiltBackChannel().
  • Step 9: The Back-channel conduit's prepare() method will be called with a response message as the argument.
  • Step 10: Back-channel conduit sets its own OutputStream as a message context.
  • Step 11: CXF runtime processes the response message, calls the interceptor chain and invokes Conduit.close(Message) for the response message.
  • Step 12: Finally the OutputStream.doClose() method for the response message is invoked.
  • Step 13: In doClose() method the OutputStream class has access to the marshalled response message and will send this message through the network as a response to the client. In the case of streaming, the first part of the message can already be sent to the network at this time, and Conduit will send the last part and close the sending later. Normally an incoming connection for a specified protocol is cached and created only if necessary.

Registration of Transport Factory

There are two ways to register transport factory: programmatically or via Spring configuration.

To register transport factory programmatically it is necessary to execute the following code:

CustomTransportFactory transportFactory = new CustomTransportFactory();

Bus bus = BusFactory.getThreadDefaultBus();

DestinationFactoryManagerImpl dfm = bus.getExtension(DestinationFactoryManagerImpl.class);
dfm.registerDestinationFactory(TRANSPORT_IDENTIFIER, transportFactory);

ConduitInitiatorManager extension = bus.getExtension(ConduitInitiatorManager.class);
extension.registerConduitInitiator(TRANSPORT_IDENTIFIER, transportFactory);

TRANSPORT_IDENTIFIER is a unique transport id (normally in form [REF-8]).

For Spring configuration, the following could be used instead:

<bean class="org.company.cxf.transport.CustomTransportFactory"
lazy-init="false">
<property name="transportIds">
  <list>
    <value>TRANSPORT_IDENTIFIER</value>
  </list>
</property>
</bean>


How CXF Chooses the TransportFactory

The TransportFactory is now registered, and the CXF participant will send or receive the message. How does CXF find the correct TransportFactory to do it?

It will be fulfilled in two steps:

  1. Binding TransportFactory selection

    CXF interprets bindings like SOAP as high-level transport and also chooses appropriate TransportFactory for it. TransportFactory provides a list of Transport IDs in the method TransportFactory.getTransportIds(). As far as this list contains values of binding transport attributes and binding namespaces defined in the WSDL document, CXF will select this TransportFactory:

    WSDL:
    <wsdl:definitions
    xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/" ...> ...
    ...
    <wsdl:binding name="GreeterPortBinding" type="tns: GreeterPortType">
    <soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/>

    ...
    <wsdl:service name="GreeterService">
    <wsdl:port binding="tns:GreeterPortBinding" name="GreeterPort">
    <transport:address location="LOCATION_URL">
    ...

    TransportFactory class:
    ...
    public static final List<String> DEFAULT_NAMESPACES = Arrays.asList(
    "http://schemas.xmlsoap.org/soap/",
    "http://schemas.xmlsoap.org/wsdl/soap/",
    "http://schemas.xmlsoap.org/wsdl/soap12/",
    "http://schemas.xmlsoap.org/soap/http/",
    "http://schemas.xmlsoap.org/wsdl/soap/http",
    "http://www.w3.org/2010/soapjms/",
    "http://www.w3.org/2003/05/soap/bindings/HTTP/",
    "http://schemas.xmlsoap.org/soap/http");

    public final List<String> getTransportIds() {
    return DEFAULT_NAMESPACES;
    }


  2. Protocol TransportFactory selection

    The binding TransportFactory is found when CXF looks for protocol TransportFactory responsible for physical network communication. In this case the method TransportFactory.getUriPrefixes() is important because it returns a list of protocol prefixes supported by the TransportFactory.

    When a CXF client or service tries to communicate using a URL with specified a protocol prefix (http://, https://, jms://, local://), CXF looks into registered transport factory maps and gets the right one for this prefix. If no TransportFactory for this protocol is found, CXF throws the corresponding exception.

    Client configuration:

    <jaxws:client id="FlightReservationClient"
    xmlns:serviceNamespace="http://www.apache.org/cxf/samples/FlightR
    eservation"


    serviceClass="org.apache.cxf.samples.flightreservation.FlightRes
    ervation"

    serviceName="serviceNamespace:FlightReservationService" endpointName="serviceNamespace:FlightReservationSOAP">
    address="http://localhost:8040/services/FlightReservationService">
    </jaxws:client>
    ...

    TransportFactory class:
    ...
    private static final Set<String> URI_PREFIXES = new HashSet<String>();
    static {
    URI_PREFIXES.add("http://");
    URI_PREFIXES.add("https:");
    }
    public Set<String> getUriPrefixes() {
    return URI_PREFIXES;
    }


Conduit and Destination Lifecycle

Destinations are normally created by a service on start-up and released on shutdown. Conduits can be either recreated for each request or cached based on endpoint information from the whole client lifetime. Clients can make concurrent calls to endpoints using different protocols and bind them to different conduits.


Concurrency Aspects

Conduit and destination objects can by concurrently accessed by multiple threads. Implementations should care about thread safety of the class.


Streaming

It is strongly recommended to not break streaming in Conduit and Destination implementations, if physical protocol supports it. CXF is completely streaming-oriented which causes high performance and scalability.


How to Start

What is the start point to understand the CXF transport layer and implement your own transport? It is recommended to read CXF documentation [REF-1] and analyze source code of existing CXF transports (Local and JMS are more straightforward). They are located into the packages org.apache.cxf.transport.local and org.apache.cxf.transport.jms correspondingly.


Scalable CXF Applications Using JMS Transport

Java Message Service (JMS) is a widespread and popular messaging API. As far as JMS is standardized, the same application code can successfully work with different JMS implementations: WS MQ, Active MQ, Tibco, Joram, BEA WebLogic, and OpenJMS. CXF provides a transport that enables endpoints to use JMS queues and topics.


Default CXF Consumer and Producer Using JMS

Implementing a CXF client and service using JMS transport is trivial. Basically, it is enough to configure two things in WSDL:

  1. Specify JMS transport URI in binding element
  2. Define JMS address in port element

WSDL binding and port should look like

<wsdl:definitions
xmlns:jms="http://cxf.apache.org/transports/jms"
...
<wsdl:binding name="Greeter_SOAPBinding" type="tns:Greeter">
<soap:binding style="document"
transport="http://cxf.apache.org/transports/jms"/>
...
</wsdl:binding>

<wsdl:service name="JMSGreeterService">
<wsdl:port binding="tns:JMSGreeterPortBinding" name="GreeterPort">
<jms:address
destinationStyle="queue"
jndiConnectionFactoryName="ConnectionFactory"
jndiDestinationName="dynamicQueues/test.cxf.jmstransport.queue">
<jms:JMSNamingProperty name="java.naming.factory.initial"
value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
<jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61616"/>
</jms:address>
</wsdl:port>
</wsdl:service>

CXF clients and servers implemented in Java or using Spring configuration magically work for this WSDL (under the hood CXF selects the correct JMS Conduit and Destination based on address URL). Details are described in [REF-3].

CXF also delivers jms_queue and jms_pubsub examples illustrating the use of the JMS transport with default settings for ActiveMQ [REF-4].


Scalability Problems

Unfortunately there are two main scalability drawbacks when using default the JMS configuration:

  1. It doesn't provide sessions pooling and consumers/producers cache (*).
  2. Default JMS message consumer is single threaded. It means that only one thread will get messages from the queue or topic and pass them on for further processing.

Both aspects are critical for enterprise applications and their implementation is not an easy task. Is there any easy solution? Yes, Spring JMS functionality and CXF Features can solve this.

(*) Some JMS vendors provide integrated session pooling and consumers/producers cache in ConnectionFactory. In this case using Spring CachingConnectionFactory is not necessary. Please refer vendor documentation to clear this up.


Spring JMS Functionality

Spring provides a number of useful classes that help to implement scalable JMS applications. Important for us are:

  • org.springframework.jms.connection.CachingConnectionFactory
  • org.springframework.jms.listener.DefaultMessageListenerContainer

  1. CachingConnectionFactory: CachingConnectionFactory provides the possibility to configure session pooling, as well as consumers and producers' cache. Below is a sample configuration of CachingConnectionFactory:

    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory">
    <bean class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
    </property>
    <property name="sessionCacheSize" value="20"/>
    <property name="cacheProducers" value="true"/>
    <property name="cacheConsumers" value="true"/>
    </bean>

    As you can see it is possible to set the size of the session pool and to switch on producers and consumers caching.


  2. DefaultMessageListenerContainer: DefaultMessageListenerContainer enables getting messages from the destination in parallel, using multiple threads. Configuration of DefaultMessageListenerContainer looks like:

    <bean id="queueContainerListener"
    class="org.springframework.jms.listener
    .DefaultMessageListenerContainer"
    >
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destinationName" value="Q_WM_OUT" />
    <property name="messageListener" ref="simpleListener" />
    <property name="cacheLevel" value="3" />
    <property name="concurrentConsumers" value="10" />
    <property name="maxConcurrentConsumers" value="50" />
    </bean>

It is possible to define here:

  • Initial and maximal number of concurrent consumers. This tells the Spring to always start up an initial number of consumers (concurrentConsumers). When a new message has been received, if the maxConcurrentConsumers has not been reached, then a new consumer is created to process the message.
  • Cache level (3 - cache connections, sessions and consumers; 2 - cache connections and sessions, 1 - cache connections only)
  • Specify message listener class (implementing MessageListener interface) and connection factory.

It is important to be aware of following things related to consumers caching:

  • Normally it makes no sense to increase the number of concurrent consumers for a JMS topic. It leads to concurrent consumption of the same message, which is not desirable.
  • The concurrentConsumers property and the maxConcurrentConsumers property can be modified at runtime, for example, via JMX.

Details about Spring-based configuration are described well in Bruce Snider's Blog [REF-5].

You can see that Spring provides solutions for both mentioned scalability aspects. But how we can use it in CXF?


CXF JMS Configuration Feature

As the CXF JMS implementation is based the Spring JMS classes, the user can benefit from the described Spring JMS functionality. CXF provides a Feature to configure details of the JMS transport: JmsConfigFeature. A CXF Feature is something that is customizable as a Server, Client, or Bus, typically adding capabilities. In our case we will add a feature in jaxws:endpoint and jaxws:client to tune the JMS transport. Let's see how to configure CXF client and service using this feature.


Server Configuration

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="sessionCacheSize" value="20"/>
<property name="cacheConsumers" value="true"/>
</bean>

<bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration"
p:connectionFactory-ref="cachingConnectionFactory"
p:cacheLevel="3"
p:concurrentConsumers="16"
p:maxConcurrentConsumers="16"
p:targetDestination="Q_HSC"
p:wrapInSingleConnectionFactory="false"
/>

<jaxws:endpoint id="JMSGreeterService" address="jms://"
implementor="#JMSGreeterServiceImpl">
<jaxws:features>
<bean class="org.apache.cxf.transport.jms.JMSConfigFeature">
<p:jmsConfig-ref="jmsConfig">
</bean>
</jaxws:features>
</jaxws:endpoint>

The jaxws:endpoint configuration contains the JMSConfigFeature. This feature has a property with JMSConfiguration type. The JMSConfiguration property supports all settings that we have seen in the Spring DefaultMessageListenerContainer: cached connection factory with session pool size, the number of concurrent consumers, and cache level. All settings of JMSConfiguration are described in [REF-6]. Using this configuration the server application can be tuned to achieve optimal performance in our target environment.


Client Configuration

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616">
</bean>
</property>
<property name="sessionCacheSize" value="20">
<property name="cacheProducers" value="true">
</bean>

<bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration"
p:connectionFactory-ref="connectionFactory"
p:targetDestination="Q_HSC"
p:cacheLevel="3"
p:concurrentConsumers="16"
p:maxConcurrentConsumers="16"
p:wrapInSingleConnectionFactory="false">
<jaxws:client id="JMSGreeterService" address="jms://"
serviceClass="com.sopera.services.tpoc.eventgenerator.EventGenerator">
<jaxws:features>
<bean class="org.apache.cxf.transport.jms.JMSConfigFeature">
<property name="jmsConfig" ref="jmsConfig">
</bean>
</jaxws:features>
</jaxws:client>

Client configuration looks very similar to the server one except one thing: CachingConnectionFactory activates producers caching instead consumers caching.


Conclusion

CXF provides very flexible and pluggable transports layers. It is possible to configure standard transport implementations delivered with CXF as well as implement and integrate a new custom transport. Creation of custom transports in CXF is straightforward.

Some Apache projects can be easily integrated using CXF transports; Camel provides the possibility to bind CXF participants directly to Camel routes and JBI transport simplifies communication with ServiceMix applications. CXF also gives a user the possibility to tune the standard transports. To achieve a high scalability of a CXF client and service in JMS communication, they can use the CXF JMS Configuration Feature. It is not necessary to write code; simply configure and leverage already existing material.

Using this feature can have essential influence on the performance for some environments. It is possible to improve a CXF service throughput 360% (from 500 to 1800 msg/sec) just using a session pool and multithread JMS consumer. Reference performance numbers for SOAP over JMS are represented in [REF-7]; one can easily compare it with their own results and make appropriate tuning if necessary.


References

[REF-1] CXF transports overview http://cxf.apache.org/docs/transports.html

[REF-2] Camel CXF transport http://camel.apache.org/camel-transport-for-cxf.html

[REF-3] CXF JMS Transport http://cxf.apache.org/docs/jms-transport.html

[REF-4] cxf-distribution/examples: jms_queue; jms_pubsub

[REF-5] Bruce Snider's blog http://bsnyderblog.blogspot.de/2010/05/tuning-jms-message-consumption-in.html

[REF-6] CXF JMS Configuration Feature http://cxf.apache.org/docs/using-the-jmsconfigfeature.html

[REF-7] Christian Schneider's blog: CXF performance http://www.liquid-reality.de/pages/viewpage.action?pageId=5865562

[REF-8] http://apache.org/transports/PROTOCOL_PREFIX