Saturday, November 2, 2013

Publish/Subscribe to Weblogic JMS Queue/Topic messages using Oracle Data Integrator


Publish/Subscribe to Weblogic JMS Queue/Topic messages using Oracle Data Integrator

JMS Queue/Topics are predominantly used in OLTP systems for asynchronous message processing, persistence and its robust support for competing consumers (Queue) and subscribers (Topics) still maintaining the XA capabilities.  Even if Oracle Data Integrator is a batch loading tool predominantly used in OLAP systems (exceptions are Oracle Apps/Fusion Apps), sometimes there comes a requirement to subscribe to Topics data to store in database for further processing in OLAP systems. ODI provides support for JMS Queue/Topic for text/XML payloads. In this blog, I am going write about the steps for publishing/subscribing to weblogic JMS Queue/Topic messages using ODI. Lets Get Started.

First I am going to show the Admin part of weblogic on how to create queue/topics just for those who are new to admin activities so that they can create queues/topics. Later I will show the steps of publishing/subscribing messages in ODI.

Create Queue/Topic in Weblogic


Create a new JMS server in weblogic targeted to Admin server here. I haven't specified any persistent store here. You can configure file/database based persistent store for storing messages.
Create a JMS module targeted to Admin server.
Create a subdeployment under the jms module 'odimodule' we created earlier. I have targeted this to jmsserver we created earlier.
Create queue , topic and connection factory resources under 'odimodule' and selected the sub-deployment we created earlier. They are automatically targeted to jmsserver1 through the subdeployment we selected. Connection Factory is needed to make connection to weblogic to get the resources like queue,topic from a client outside weblogic.

Now we have completed the admin activities to start the work in ODI. Lets test it using java client first before we configure odi. Please add the libarary Wlthint3client.jar to your java project.
package JMSClientApp;


import java.util.Hashtable;

import javax.naming.*;

import javax.jms.*;

public class JMSTest {
    private static InitialContext ctx = null;
    private static QueueConnectionFactory qcf = null;
    private static QueueConnection qc = null;
    private static QueueSession qsess = null;
    private static Queue q = null;
    private static QueueSender qsndr = null;
    private static TextMessage message = null;
    // NOTE: The next two lines set the name of the Queue Connection Factory
    //       and the Queue that we want to use.
    private static final String QCF_NAME = "jms/odicf";
    private static final String QUEUE_NAME = "jms/odiqueue";

    public JMSTest() {
        super();
    }

    public static void sendMessage(String messageText) {
        // create InitialContext
        Hashtable properties = new Hashtable();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
        // NOTE: The port number of the server is provided in the next line,
        //       followed by the userid and password on the next two lines.
        properties.put(Context.PROVIDER_URL, "t3://localhost:7001");
        properties.put(Context.SECURITY_PRINCIPAL, "weblogic");
        properties.put(Context.SECURITY_CREDENTIALS, "welcome123");
        try {
            ctx = new InitialContext(properties);
        } catch (NamingException ne) {
            ne.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got InitialContext " + ctx.toString());
        // create QueueConnectionFactory
        try {
            qcf = (QueueConnectionFactory) ctx.lookup(QCF_NAME);
        } catch (NamingException ne) {
            ne.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got QueueConnectionFactory " + qcf.toString());
        // create QueueConnection
        try {
            qc = qcf.createQueueConnection();
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got QueueConnection " + qc.toString());
        // create QueueSession
        try {
            qsess = qc.createQueueSession(false, 0);
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got QueueSession " + qsess.toString());
        // lookup Queue
        try {
            q = (Queue) ctx.lookup(QUEUE_NAME);
        } catch (NamingException ne) {
            ne.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got Queue " + q.toString());
        // create QueueSender
        try {
            qsndr = qsess.createSender(q);
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got QueueSender " + qsndr.toString());
        // create TextMessage
        try {
            message = qsess.createTextMessage();
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Got TextMessage " + message.toString());
        // set message text in TextMessage
        try {
            message.setText(messageText);
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Set text in TextMessage " + message.toString());
        // send message
        try {
            qsndr.send(message);
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
            System.exit(0);
        }
        System.out.println("Sent message ");
        // clean up
        try {
            message = null;
            qsndr.close();
            qsndr = null;
            q = null;
            qsess.close();
            qsess = null;
            qc.close();
            qc = null;
            qcf = null;
            ctx = null;
        } catch (JMSException jmse) {
            jmse.printStackTrace(System.err);
        }
        System.out.println("Cleaned up and done.");
    }

    public static void main(String args[]) {
        sendMessage("8,test3,24");
        sendMessage("9,test4,24");
    }
}

The java client test is successful and message published successfully to JMS Queue.

Configuring JMS Queue in ODI

ODI provides four technologies in topology for JMS configuration. They JMS Queue,JMS Queue XML, JMS Topic and JMS Topic XML. Since JMS needs to be fit into ODI architecture (of handling text, xml in different ways), they are designed this way. Basically JMS Queue and JMS Topic handle text, csv, fixed length payloads to put them into relational tables. JMS Queue XML and JMS Topic XML handle xml payload to put them into relational tables. Here I am going to show text payloads using csv format. XML needs more configuration which i will show in another blog or extend this blog.

Let me take JMS Queue for illustration here. It involves setting up JNDI configuration of connection factory for weblogic connection in ODI JMS Queue technology in topology. Next we need to create a data model. In the data model we need to create a data store to represent the queue. Here are the steps

Create a physical server in toplology for JMS Queue technology and provide the JNDI connection information for the connection factory. ODI makes the connection through connection factory to get the resources. Click on test connection and it should succeed.

create a logical server assign the physical server for the context.
Create the Model by selecting the JMS Queue technology and logical server we created before.
For JMS technology reverse engineering is not supported and we need to manually create the data store for the jms queue. Make sure that jndi name is specified correctly for the queue in resource name field.
In the file section configure the jms payload format. Here I saying that payload is a comma separated text.

configure the payload columns in the columns section of datastore. I am configuring here for three columns for id, name and age with id as key. So comma separated text received in payload is expected to have three fields.

With this we have completed the ODI configuration for the JMS Queue. Next up we need to create the interfaces for publishing message to JMS and subscribing the message from JMS.

Create Interfaces for Publishing/Subscribing JMS Messages
Following are the knowledge modules we would need to loading, integration. Please import them to the odi project. IKM SQL to JMS and LKM JMS to SQL are the jms specific KMs for loading and integration.

INT-TABLE_TO_JMS - this interface uses IKM SQL to JMS Append for integration to JMS.
INT-JMS_TO_TABLE - This interface uses LKM JMS to SQL, IKM Oracle Incremental update and CKM Oracle for loading, integration and check constraints.

INT-TABLE_TO_JMS interface for publishing message

Create the INT-TABLE_TO_JMS interface with staging different from target option selected (select your source logical server as staging area), drop the table in source (I have created a table in HR schema with three columns and two rows of data) and the jms datastore to target in the mapping area.

mapping is done here.
ODI automatically sets the IKM with SQL to JMS Append here since we have the KM in the project. Please also note the publish commit option here. This option can be set to false here and in the odi package we can do further processing and issue a commit later using a seperate command in the package. This is where XA transaction handled in ODI between the database and JMS resources.
Lets run this interface and check whether it is published to JMS from table. I have two records in the table.
INSERT INTO "HR"."JMS_INPUT" (ID, NAME, AGE) VALUES ('1', 'test1', '20');
INSERT INTO "HR"."JMS_INPUT" (ID, NAME, AGE) VALUES ('2', 'test2', '21');
Interface run is successful and I could see messages in the jms.

INT-JMS_TO_TABLE interface for subscribing messages

Create the interface with jms datastore as source and table as target in the mapping. here staging area is part of target area.

In the flow area LKM JMS to SQL is used to load data. Also not that commit can be made false here and when this interface is part of the package, commit can issues separately in a procedure as part of this package if need to do combine data processing and jms message consumption in a single transaction.
Run the interface and see whether it populates the output table
Interface run is successful and output table is populated from the from the messages.
There are no message is the queue and the table is populated.
In a similar way, topic messages can be consumed and we can publish messages to topic. In the only difference is that topic keeps the messages until all consumers subscribe to the message till it expires.

3 comments:

  1. Thanks for this greatful post! Where can I find a tutorial about JMS XML ODI Configuration?

    ReplyDelete
  2. Sir I have a question.
    My source is oracle target is weblogic JMS queue.
    There is a requirement to send 500 records in one message in JMS queue. How can I implement the logic.
    Thanks

    ReplyDelete
  3. Oracle Data Integrator Tutorials: Publish/Subscribe To Weblogic Jms Queue/Topic Messages Using Oracle Data Integrator >>>>> Download Now

    >>>>> Download Full

    Oracle Data Integrator Tutorials: Publish/Subscribe To Weblogic Jms Queue/Topic Messages Using Oracle Data Integrator >>>>> Download LINK

    >>>>> Download Now

    Oracle Data Integrator Tutorials: Publish/Subscribe To Weblogic Jms Queue/Topic Messages Using Oracle Data Integrator >>>>> Download Full

    >>>>> Download LINK

    ReplyDelete