Notes on JXTA Development

I am regrouping here some implementation information regarding JXTA as I am learning it.
Some documents [see below] have been extremely helpful in this process [specially Brendon Wilson's drafts for his JXTA book].

Layout



Platform Configuration

The Platform is configured either programmatically using the Configurator class or, by default, using the graphical configuration interface. The configuration is saved in a file, named PlatformConfig by default that is saved in the directory from which the application is started under a folder called ".jxta". Here is an example PlatformConfig file.These are described in the file PeerGroup.java (platform/binding/java/api/src/net/jxta/peergroup/PeerGroup.java)

MCID ending in 0905 refers to TCP Protocol
MCID ending in 0605 refers to Rendezvous
MCID ending in 0E05 refers to Proxy
MCID ending in 0805 refers to EndPoint
MCID ending in 0105 refers to PeerGroup
MCID ending in 0A05 refers to HTTP Protocol
MCID ending in 0F05 refers to Relay

Modules

Create a Module Class, Spec or Impl advertisement
The implementation refers to a class (anyone) set by the setCode of the moduleImplAdvertisement
It can be initted using the init method of the JxtaLoader.
The modules are interrelated using IDs
[ModuleClassID[ModuleSpecID]ModuleImplID]
A module spec ID is set in relation to a module class ID.
A module impl ID is set in relation to a module spec ID.
When an impl advertisement is discovered, a module spec ID can be extracted from it and compared to a Module Spec Advertisement's specID that defines the communication mechanisms of this service and may contain an advertisement to a communication channel.
Similarly the module spec ID can link to a module class ID that defines the general behavior of this service.
Using this mechanism, a behavior advertised remotely can be made available and used locally!!

Pipes

Pipes are communication channels. Every pipe has a unique identifier. Input and Output channels can be created from a pipe to send or receive messages.
When receiving a message a pipeeventhandler is fired that can read the message from the pipe.
Pipe advertisement can be embedded in a Module Spec Advertisement or published independently. They are peername independent meaning that a discovered pipe does not link to the peer advertising it.

Bidirectional Pipes

Bidirectional Pipes are a convenient utility for 2-way peer communication.
Given a pipeadvertisement, a bidirectional communication can be established between a client and a server as follows:
  • Server creates a BidirectionalPipeService.AcceptPipe from a pipeAdvertisement
  • Server advertises its pipeAdvertisement
  • Client discovers pipeAdvertisement
  • Client creates a BidirectionalPipeService and uses bidirpipe.connect(pipeAdv,time) and obtain a BidirectionalPipeService.Pipe
  • Server accepts the connection through acceptPipe.accept(timeout) and therefore obtains a BidirectionalPipeService.Pipe
  • from this common BidirectionalPipeService.Pipe both client and server can getInput and getOutput pipes for transferring messages.

Adding a new advertisement in the Jxta-java framework

In order to have an advertisement recognized upon discovery, it needs to be registered through the AdvertisementFactory registerAssoc method, that reads in the net/jxta/impl/config.properties files to get all the AdvertisementInstanceTypes.
1-a For static linking, the new advertisement name has to be added in the config.properties file. 1-b For dynamic linking, use AdvertisementFactory.registerAdvertisementInstance(newAdvertisement.getAdvertisementType(),new newAdvertisement.Instantiator()) to register the new Advertisement at runtime.
The new class net.jxta.protocol.newAdvertisement extends Advertisement and is used as a placeholder for the actual advertisement.
The new class net.jxta.impl.protocol.newAdv extends newAdvertisement and initializes the actual advertisement Document.
2- Write the two advertisement classes net.jxta.protocol.newAdvertisement and net.jxta.impl.protocol.newAdv
These two classes can be kept with the application files or made part of the core platform classes.
3- Copy those files in every application that will require using the new advertisement
READY to advertise!

Creating a new Service

JXTA defines several core services that allow the platform to be truly operational and provide functionality after bootstrapping.
These services are:
  • Endpoint : To transfer messages right on top of the network layer
  • RendezVous :
  • Resolver
  • Pipe
  • Peer Information
  • Discovery
  • Membership
Some steps:
  • Interface someService extends Service: adds/removes listener and defines the stubs for the methods offered by the service
  • Interface someServiceListener : stubs for methods handled by the listener
  • someServiceEvent extends java.util.EventObject : wraps the response object into an event object that is passed to the listener for processing.
  • someServiceImpl implements someService and possibly QueryHandler : actual code for the methods defined in someService and also for QueryHandler (processQuery, processResponse)
In order to be used, this new service has to be added to the list of services offered by a peergroup. This is done by retrieving the peergroup list of services and adding this new one to the existing list.
Generating new ID's for every new element in the list.
A typical way of making use of the new service in an application is to :
  • add the service to the peergroup list of services
    •         ModuleImplAdvertisement implAdv = 
                netPGroup.getAllPurposePeerGroupImplAdvertisement();
              //Get the services params
              StdPeerGroupParamAdv params = 
                new StdPeerGroupParamAdv(implAdv.getParam());
              //get the mapping from service id's to their implementations
              Hashtable services = params.getServices();
              //Add new services to list of services of group
              services.put(ModuleClassID(), ModImplAdv());
              params.setServices(services);
              implAdv.setParam((StructuredDocument) params.getDocument(
                new MimeMediaType("text", "xml")));
              //Set new ModuleSpecID for this new set of services
              implAdv.setModuleSpecID((ModuleSpecID) IDFactory.fromURL(
                new URL(refPeerGroupSpec)));
              PeerGroupID groupID = (PeerGroupID) IDFactory.fromURL(
                new URL((refPeerGroupID)));
              newGroup = netPGroup.newGroup(groupID, implAdv,name,description);
              PeerGroupAdvertisement groupAdv = 
                  newGroup.getPeerGroupAdvertisement();
      
  • publish the new peergroupadvertisement
    •         discovery.publish(implAdv, DiscoveryService.ADV);
              discovery.remotePublish(implAdv, DiscoveryService.ADV);
              discovery.remotePublish(groupAdv, DiscoveryService.GROUP);
      		...more publishing...
      
  • implement the someService methods handling sending and receiving messages
  • registering listeners for this service if the peer has to be offering the service.
  • Define a GUI for the application
Once the peergroup registers the service, the service can be invoked by looking up the peergroup for the ClassID of the service.
A typical service, using the Resolver Service implements the QueryHandler interface that defines two methods, processQuery and processResponse.
processQuery returns a ResolverResponseMsg that defines the response object to the query received. This response object is sent back to the requesting peer. Upon receiving a resolverResponseMsg a peer looks up the service that handles this message (defined by the handler name in the resolver response message) and passes it on to QueryHandler's processResponse method that may either process the response and display a response directly or notify all the listeners waiting for this response. The response can be wrapped into an event object that is passed to the listeners.

Sending Queries using the Resolver Service

JXTA's resolver service allows to easily send a query to a peergroup or a specific peerID.
The Resolver Service is part of the core services offered by the platform.
To send a query and process a response takes the following steps:
  • Register a Handler Name for the class handling the query [This handler class implements the QueryHandler class that defines two methods:processQuery and processResponse]
  • Create a query message object and send it using the resolver service
  • When the remote peer receives the query, it matches the handler name for this query and creates a response object before sending it back to the requesting peer.
Both peers {local and remote} should have the handler registered in order to process the query and response messages.

The Resolver service can be used to send messages to remote services, when a response object is received, the resolver handler should know of its format in order to extract the response string [that's the only requirement for handling responses correctly].

Starting a Module or an Application component

Module and Application interfaces provide three methods :
public void init(PeerGroup group, ID assignedID, Advertisement implAdv) throws PeerGroupException
public int startApp(java.lang.String[] args)
public void stopApp()
When the peergroup starts (through group.startApp(args)), it invokes the init and startApp method on each module or applicaton component that are part of it.
Here are the steps required to get an application component to start :
  • write an application object that implements Application (init,startApp,stopApp)
  • createModuleClassAdv,createModuleSpecAdv,createModuleImplAdv for this module/application
  • add the module/application to the peergroup's set of applications:
    Hashtable apps = new Hashtable();
    apps.put(app.getModClassAdv(),appImplAdv);
    params.setApps(apps);
    implAdvOfGroup.setParams(params);
    group.startApp(null);
  • the application startApp method will be invoked when the peergroup starts.
This is a convenient way of writing applications because the application will be passed the peergroup to use (from where it obtains its services) without having to check and see if the peergroup has been generated.

Writing applications using JXTA's Java Binding: A Template

Brendon Wilson details a sample application in Chapter 11 of his book on JXTA. This sample application details very well how to write an application and services in Jxta's java binding.
Generic Classes:
A Main Class where:
  • Module advertisements are created
  • PeerGroup is Created
  • PeerGroup is initialized
  • Application is Started
This looks like:
public class MainClass
{
createModClassAdv() {}
createModSpecAdv() {}
createModImplAdv() {}
createPeerGroup()
{
   modclassadv = createModClassAdv
   modspecadv = createModSpecAdv
   modimpladv = createModImplAdv
   ...as many services/apps  as needed
   services = peergroup.getServices()

   services.put(modclassadv.getModClassID(),modimpladv);
   ...as many services as needed
   
   applications.put(modclassadv.getModClassID,modimpladv);

   newgroup = peergroup.newGroup(impladv)
   
   newgroup.startApp()
}
initializePG()
{
peergroup = PeerGroupFactory.newNetPeerGroup()
}
As mentioned earlier, define apps and services takes :
aService.java
aServiceImpl.java
aServiceListener.java


Each service should be associated to a Handler class. The handler basically implements the serviceListener class.

Besides the generic main class, there should be a class defining the User Interface and firing the services and service Handlers, that looks like:
public class MainGUI() implements Application
{
initGUI()
{
JFrame...
setVisible(true);
}

public void init(PeerGroup,ID,impladv)
{
}

public int startApp(String[] args)
{
aService = peergroup.lookup(aservice.getClassID());
aService.addListener(aServiceHandler)

initGUI()
}

public void stopApp()
{
aService.removeListener()
}

Printing out messages

In EndpointDemuxListener.java the processIncomingMessage function can be modified by adding at the begining of the method :
net.jxta.endpoint.MessageElementEnumeration enum = message.getElements();
  System.out.println("**************************************************");
  if(enum!=null)
  while(enum.hasMoreMessageElements())
  {
  try{
  java.io.InputStream is = (java.io.InputStream)enum.nextMessageElement().getStream();
  net.jxta.document.StructuredTextDocument document = (net.jxta.document.StructuredTextDocument)
  net.jxta.document.StructuredDocumentFactory.newStructuredDocument(
  new net.jxta.document.MimeMediaType("text/xml"), is);
  java.io.StringWriter out = new java.io.StringWriter();
  document.sendToWriter(out);
  System.out.println(out.toString());
  }catch(Exception e){System.err.println("Error demuxing"+e);}
  }
This snippet of code, enumerates all elements of the incoming message and prints them out to the console.

JXTA Boot Message

One may wonder, when the platform boots what is the first message sent out on the network ? The response is none. Not if any message is sent on the wire through a discovery request or publishing for instance. If one peer starts and listens to discoveryQueryMessages while another publishes a null advertisement, here is what the endpoint service is actually transmitting (as seen in XML). We see that the structure is as follows:
ResolverResponse
	DiscoveryResponse
		PeerAdvertisement
	/DiscoveryResponse
/ResolverResponse
RendezVousPropagateMessage
/RendezVousPropagateMessage

Story of a Message

It is interesting to travel through the services along a message. How from the high-level Resolver Service it gets on the other end of the network. Here is a brief trip with a JXTA message:


the
ResolverServiceImpl.sendQuery
invokes
RendezvousServiceImpl.propagateInGroup
which calls
RendezvousServiceImpl.sendToNetwork
RendezvousServiceImpl.sendToEachRendezvous
RendezvousServiceImpl.sendToEachClient


sendToNetwork calls

Endpoint.propagate on all active protocols
for instance for tcp, it calls
TcpTransport.propagate which multicasts through
MulticastSocket the message to the entire group using DatagramPacket (i.e. UDP)

sendToEachRendezvous and sendToEachClient call

PeerConnection.sendMessage which calls
PeerConnection.dosendMessage
invoking
EndpointMessenger.sendMessage
The EndpoinMessenger depends on what protocol is currently used:
it is obtained from the
EndpointService.getMessenger
for tcp it returns
TcpNonBlockingMessenger on which sendMessage is called, actually sending the message through
TcpConnection.sendMessage building and sending a UNICAST message.

When sending a message using a Pipe, the PipeService selects the appropriate implementation of the OutputPipe, being NonBlockingOutputPipe, SecureOutputPipe or WirePipe.
OutputPipe.send(Message) calls EndpointMessenger.sendMessage as presented above.
Things I have modified in the platform 2.0 JXTA src:
Added isServerEnabled to net.jxta.util.config.Configurator.setRelay(boolean)
Commented out net.jxta.impl.protocol.TCPAdv.getDocument part on writing ServerOffTag
In Platform 2.1 PeerGroup Impl Adv need a MimeMedia type of XMLUTF8 and not text.

Miscellaneous

Adding a runtime check of the version of the JXTA platform being used:

Add the file
platform/binding/java/api/src/net/jxta/platform/Version.java
Where Version.java is of the form
package net.jxta.platform;

public class Version
{

public static void main(String[] args)
  {
    System.out.println("JXTA Version 2.1");
  }
}
Replacing Version 2.1 by the version of the platform as you know it is.
In a build.xml file add a corresponding rule to run Version.class
<target name="version" depends="test">

  <java classname="net.jxta.platform.Version" fork="yes"
    classpathref="classpath" >

    <jvmarg value="-Dnet.jxta.tls.principal=somelogin"/>
    <jvmarg value="-Dnet.jxta.tls.password=somepass"/>
  </java>
</target>
Object Serialization
To serialize ANY java Object in 'data' to a byte array use:
byte[] serializedObject = null;

try {
  ByteArrayOutputStream bytes = new ByteArrayOutputStream();
  ObjectOutputStream b = new ObjectOutputStream(bytes);
  b.writeObject(data);
  b.flush();
  b.close();
  serializedObject = bytes.toByteArray();
  bytes.close();
}
catch (IOException e) {
  System.out.println("OBJECT IS NOT SERIALIZABLE - " +
		     data.getClass().getName());
}

.. then to create the message use:

Message msg = peer.getPipeService().createMessage();

MessageElement messageElement = msg.newMessageElement(
						      TAG, new MimeMediaType("text/plain"), serializedObject);
// add the newly element to the message.
msg.addElement(messageElement);

......and to deserialize the object from a jxta Message in 'msg':

Object data = null;

try {
  byte[] serializedObject = msg.getBytes(TAG);
           ByteArrayInputStream bytes = new
	     ByteArrayInputStream(serializedObject);
           ObjectInputStream b = new ObjectInputStream(bytes);
           data = b.readObject();
           b.close();
           bytes.close();
}
catch (Exception e) {
  System.out.println("RECV: OBJECT IS CORRUPTED - ");
}


Shared Resources Distributed Index (SRDI)

SRDI is a service offering mechanisms to achieve a loosely-consistent Distributed Hash-Table on a JXTA overlay network.
The SRDI service differs from the original ResolverService in several ways,
  • A propagation of messages based on RendezVous peers based on the Rendezvous peerview (not flooding the network)
  • A Key replication mechanism to strengthen availability in case of failure
On the Client side, the process starts as in the ResolverService through a
ResolverService.sendSrdi(dest,message)
This call will either sendMessage to the destination if unique or propagate it to all rendezvous through a call to Rendezvous.walk.
The walk calls propagate on either EdgePeerRdvService which will propagate the message on to the Group, the Rendezvous and the Peers; or on RdvPeerRdvService which will use the RdvWalker to walk along the possible Rendezvous in the RendezvousPeerView.

Discovery Service

When publishing using the publish method, an advertisement is published only locally. The publication process consists in saving the Advertisement in the local cache using cm.save

When publishing using the remotePublish method, a discovery response is formed and pushed to the other peers in the subnet and rendezvous, the receiving peers then save the advertisements in their cache.

When calling getRemoteAdvertisements, a peer sends out a discovery query optionally passing in arguments such as Name and Value pairs to narrow down the lookup process.
There are two distinct cases, one for which a destination Peer is passed as argument (Unicast), and another where no destination peer is used (Broadcast).

In the associated processQuery method of the resolver service implemented by the DiscoveryService, the discovery query searches the local cache for matching attribute value pairs (whne used) and returns any matching response limited by the threshold value using cm.search
If a threshold of 0 is used and a discovery type of PEER passed, then the peer will only return its own PeerAdvertisement.
SRDI indices are queried if the type of messages is Broadcast (NULL peerName); If srdi fails to find a direct match, then a walker is started over the Renvdezvous Peer View