Tuesday 9 December 2008

More Mono, AMQP and System.Messaging

I've spent the last couple of months continuing on an implementation of the System.Messaging library for Mono. I have since ditched the QPid client libraries (yes there are multiple, about 6 in fact) in favour of the single dll supplied by RabbitMQ messaging server. I am also using RabbitMQ as the main server for dev and testing. It great bit of software, really easy to install (on Ubuntu anyway) and interestingly built using Erlang.

Progress has gone really well, I have implemented all of the Receive, Send and Peek methods of MessageQueue including methods that select by Id/CorrelationId, use transactions and timeouts. MessageEnumerators are currently supported although it is not possible to implement the transactional version RemoveCurrent, due to the way the AMQP handles transactions. Well maybe not impossible, just an enormous, ineffient hack if I managed to get it to work.

The code has now be shifted into the mono trunk, which suggests I should really make sure I get it complete (or as complete as possible). One nice side effect of the implementation is that I have added an SPI that can be used by other potential implementations.

In similar news, a couple of weeks ago MS announced that they are joining the AMQP working group it will be interesting to see where they go with it.

Sunday 21 September 2008

Another GWT and Comet

I've been quite interested in GWT and Comet for some time now, however when looking around for solutions none really grabbed me. The most promising is the rocket-gwt implementation, but that requires an iframe to manage the Comet comunnication, which seemed a little unecessary. With the latest version of GWT (1.5.2) the Google guys have included a direct HTTP api in addition to the RPC mechanism provided with the earlier releases. I though I would have a go at producing my own implementation in combination with Tomcat 6. I set myself a couple of constraints:

  • The code must remain functional in the standard GWT dev environment, but not necessary performing Comet-style HTTP Push
  • When deployed to a proper Tomcat 6 servlet engine it must use the implemented CometProcessor to manage HTTP Push.
  • It must be possible to deploy/run/debug in the 2 environments (GWT dev, full Tomcat 6) without code or configuration changes when switching between them.

Client Code

I started by implementing the client portion of my little test application. Starting with the basic premise of a stock watching application (a fairly common use case for Comet), I constructed a small app that would fetch a list of stocks using the standard RPC mechanism and then listen for update using the custom HTTP/Comet implementation. I created and interface called StockUpdateService and an implementation called StockUpdateServiceImpl. For the main client method I reused the GWT AsyncCallback interface to handle the responses to the service call. The HTTP method would expect a Stock object encoded using JSON.

public interface StockUpdateService {
void waitForUpdate(AyncCallback<stock> callback);
}

public class StockUpdateServiceImpl {
private static final String url = GWT.getModuleBaseURL() + "/updateServlet";
private Request currentRequest = null;

public void waitForUpdate(final AsyncCallback callback) {
RequestBuilder builder = new RequestBuilder(RequestBuilder.GET, URL.encode(url));

try {
currentRequest = builder.sendRequest(null, new RequestCallback() {
public void onError(Request request, Throwable exception) {
callback.onFailure(exception);
}

public void onResponseReceived(Request request, Response response) {
if (200 == response.getStatusCode()) {
// When receiving a successful response...
JSONValue v = JSONParser.parse(response.getText());
JSONObject o = v.isObject();
if (o != null) {
// Decode the JSON Object...
String code = o.get("code").isString().stringValue();
long price = (long) o.get("price").isNumber().doubleValue();
long change = (long) o.get("change").isNumber().doubleValue();
Stock s = new Stock(code, price, change);
// Pass is back to the caller...
callback.onSuccess(s);
} else {
callback.onFailure(new Exception(
"Invalid JSON response: " + response.getText()));
}
} else {
callback.onFailure(new Exception(response.getStatusText()));
}
}
});
} catch (RequestException e) {
callback.onFailure(e);
}
}
}

All fairly stright forward, the HTTP api supports an HTTPResponseCallback interface, within which I simply marshall the incoming data and pass it onto the supplied AsyncCallback.

Server Code for GWT Dev Environment

Since the Tomcat server that comes with GWT does not support Comet, I had to find a way to provide the push functionality, but without using Comet. Essentially I just needed some that could be used during testing and debugging that was functionally correct. It turned out to be ridiculously simple. What is not mentioned clearly in the GWT docs is that you can create standard Java servlets in the GWT environment and they simply work the way you expect. Therefore I created a standalone serlet that would block the running thread before returning.

public class UpdateServlet extends HTTPServlet {
Stock[] stocks = {
new Stock("BA", 130, 15),
new Stock("NT", 400, -1),
new Stock("FA", 5000, 2),
new Stock("HZ", 213, -70),
new Stock("CR", 14, 1)
};

protected void doGet(HttpServletRequest req, HttpServletResponse rsp) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Continue.
}

Random r = new Random();
int stockNum = r.nextInt(stocks.length);
long change = r.nextInt(100) - 49;

Stock oldStock = stocks[stockNum];
Stock newStock = new Stock(oldStock.getCode(), oldStock.getPrice() + change, change);
stocks[stockNum] = newStock;

String result = Formatter.toJSON(newStock);
resp.getOutputStream().write(result.getBytes());
resp.getOutputStream().flush();
}
}

This class I configured in the GWT module for my project using the XML element: <servlet path='/updateServlet' class='uk.co.middlesoft.trader.server.UpdateServlet'/>

Server Code for Comet

All of that is well and good, but the important bit is the actual Comet part. I decided to use Tomcat's Comet Processor, so I created a second servlet with the same functionality as the traditional servlet but implementing the CometProcessor interface.

public class CometUpdateServlet extends HttpServlet implements CometProcessor {
public void event(CometEvent event) throws IOException, ServletException {
private final List<CometEvent> connections = new ArrayList<CometEvent>();

switch (event.getEventType()) {
case BEGIN:
synchronized (connections) {
connections.add(event);
}
break;

case READ:
HttpServletRequest req = event.getHttpServletRequest();
InputStream is = req.getInputStream();
byte[] buf = new byte[512];
do {
int n = is.read(buf); //can throw an IOException
if (n < 0);
return;
}
} while (is.available() > 0);
break;

case END:
synchronized (connections) {
connections.remove(event);
}
event.close();
break;

case ERROR:
synchronized (connections) {
connections.remove(event);
}
event.close();
break;
}
}

private Thread t = null;

public void init(ServletConfig config) throws ServletException {
t = new Thread(new MyRunnable());
t.setDaemon(true);
t.start();
System.out.println("Started thread");
}

private class MyRunnable implements Runnable {

public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
Random r = new Random();
int stockNum = r.nextInt(stocks.length);
long change = r.nextInt(100) - 49;

Stock oldStock = stocks[stockNum];
Stock newStock = new Stock(oldStock.getCode(),
oldStock.getPrice() + change, change);
stocks[stockNum] = newStock;

synchronized (connections) {
for (Iterator<CometEvent> i = connections.iterator(); i.hasNext();) {
try {
CometEvent e = i.next();
// Remove the current connection so that it can
// be closed.
i.remove();

HttpServletResponse rsp = e.getHttpServletResponse();

String result = Formatter.toJSON(newStock);
rsp.getOutputStream().write(result.getBytes());
rsp.getOutputStream().flush();

// Apparently this is needed for IE.
e.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

public void destroy() {
t.interrupt();
connections.clear();
}
}

The servlet uses a separate thread to generate events for the client. The core of the Runnable implementation works almost identically to the traditional servlet implementation returning a Stock object marshalled as a JSON string. The key thing to remember is that multiple threads will be accessing an instance of this class, so it is important to ensure that shared data access is thread safe.

The final step to setting this up, was putting together an Ant script that would build a war file that could be deployed into Tomcat. The war file would contain a web.xml which configured the CometProcessor servlet with a servlet mapping that matches the same URL as servlet defined in the GWT module.

<servlet>
<servlet-name>cometUpdateServlet</servlet-name>
<servlet-class>uk.co.middlesoft.trader.server.CometUpdateServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>cometUpdateServlet</servlet-name>
<url-pattern>/updateServlet</url-pattern>
</servlet-mapping>

Setting up the Ant script was fairly straight forward with the exception of having to create a target that would compile the GWT code into HTML and JavaScript

<target name="compile-js" depends="init" description="Build javascript code">
<java classname="com.google.gwt.dev.GWTCompiler" fork="true" failonerror="true">
<classpath>
<path refid="client.classpath"/>
</classpath>
<arg value="-out" />
<arg value="${build.www}" />
<arg value="uk.co.middlesoft.trader.StockWatch" />
</java>
<copy todir="WebContent">
<fileset dir="${build.www}/${module}" includes="**/*"/>
</copy>
</target>

The final result of this meant that I could run and debug the client using the standard GWT hosted browser and deploy a "Cometified" version straight to Tomcat simply by running the ant script, without having to change any of the code or configuration.


Friday 9 May 2008

Service Oriented UIs and MS Architecture Insight

I gave a talk at the Architecture Insight conference held by Microsoft a couple of weeks ago. The talk was on Service Oriented UIs, a term I stole from an InfoQ article from November last year. Some times referred to as Service Oriented - Front End Architecture, it was about how to build light weight (generally web-based) UIs and some of the traps and pitfalls involved. You can see the slide deck from my talk on the Insight Conference web site.

Friday 18 April 2008

System.Messaging for Mono

Currently there is no implementation for the System.Messaging namespace in Mono. Judging from the mailing lists there appears to be couple of attempts to add an implementation, but no results as yet, so I decided to have a go myself.

I have just released on Google Code a 0.0.1 release of a bridge between Mono's System.Messaging and Apache QPid. Apache QPid is an implementation of the AMQP protocol. AMQP is an open protocol designed to provide a standard wire protocol for asynchronous messaging buses. There are a number of other open source implementations of AMQP floating about too. RabbitMQ looks like another interesting implementation, built on Erlang.

I used the QPid client libraries to provide the client-side networking implementation. Unfortunately that requires linking to 8 different assemblies. Forcing users of Mono's System.Messaging to link to these at compile time seemed excessive, so I wanted to decouple the QPid implementation from System.Messaging. Unlike JMS, Microsoft's System.Messaging API was never designed to have multiple implementations. Therefore, in order to decouple the 2 I had to add a provider layer into the Mono code base. This is available in the form of a patch on the same Google code site.

The current version only provide basic sending and receiving. XML and Binary message formatting are also supported, however that is implemented inside of the Mono code base. I haven't looked into performance yet. It currently runs all requests over a single socket connection (multiplexing using AMQP's channel feature), which probably not the optimal approach.

I'll see how far I get before I head off on my big trip.