RPC Server for Erlang, In Java

Sat Oct 11 09:59:42 CST 2008发表于java.blogs Hot Blog Entries

BlogTrader (23 reads)

We are using Erlang to do some serious things, one of them is indeed part of a banking system. Erlang is a perfect language in concurrent and syntax (yes, I like its syntax), but lacks static typing (I hope new added -spec and -type attributes may be a bit helping), and, is not suitable for processing massive data (performance, memory etc). I tried parsing a 10M size XML file with xmerl, the lib for XML in OTP/Erlang, which causes terrible memory disk-swap and I can never get the parsed tree out.

It's really a need to get some massive data processed in other languages, for example, C, Java etc. That's why I tried to write RPC server for Erlang, in Java.

There is a jinterface lib with OTP/Erlang, which is for communication between Erlang and Java. And there are docs for how to get it to work. But, for a RPC server that is called from Erlang, there are still some tips for real world:

1. When you send back the result to caller, you need set the result as a tuple, with caller's tag Ref as the first element, and the destination should be the caller's Pid. It's something like:

OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[] {call.tag, tResult});
sConnection.send(call.to, msg); 

where, call.tag is a OtpErlangRef, and tResult can be any OtpErlangObject, call.to is a OtpErlangPid.

2. If you need to send back a massive data back to caller, the default buffer size of OtpErlangOutputStream is not good, I set it to 1024 * 1024 * 10

3. Since there may be a lot of concurrent callers call your RPC server, you have to consider the concurrent performance of your server, I choose using thread pool here.

The RPC server in Java has two class, RpcNode.java, and RpcMsg.java:

package net.lightpole.rpcnode;

import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangPid;
import com.ericsson.otp.erlang.OtpErlangRef;
import com.ericsson.otp.erlang.OtpErlangTuple;

/**
 *
 * @author Caoyuan Deng
 */
public class RpcMsg {

    public OtpErlangAtom call;
    public OtpErlangAtom mod;
    public OtpErlangAtom fun;
    public OtpErlangList args;
    public OtpErlangPid user;
    public OtpErlangPid to;
    public OtpErlangRef tag;

    public RpcMsg(OtpErlangTuple from, OtpErlangTuple request) throws IllegalArgumentException {
        if (request.arity() != 5) {
            throw new IllegalArgumentException("Not a rpc call");
        }

        /* {call, Mod, Fun, Args, userPid} */
        if (request.elementAt(0) instanceof OtpErlangAtom && ((OtpErlangAtom) request.elementAt(0)).atomValue().equals("call") &&
                request.elementAt(1) instanceof OtpErlangAtom &&
                request.elementAt(2) instanceof OtpErlangAtom &&
                request.elementAt(3) instanceof OtpErlangList &&
                request.elementAt(4) instanceof OtpErlangPid &&
                from.elementAt(0) instanceof OtpErlangPid &&
                from.elementAt(1) instanceof OtpErlangRef) {

            call = (OtpErlangAtom) request.elementAt(0);
            mod = (OtpErlangAtom) request.elementAt(1);
            fun = (OtpErlangAtom) request.elementAt(2);
            args = (OtpErlangList) request.elementAt(3);
            user = (OtpErlangPid) request.elementAt(4);
            to = (OtpErlangPid) from.elementAt(0);
            tag = (OtpErlangRef) from.elementAt(1);

        } else {
            throw new IllegalArgumentException("Not a rpc call.");
        }
    }

    /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */
    public static RpcMsg tryToResolveRcpCall(OtpErlangObject msg) {
        if (msg instanceof OtpErlangTuple) {
            OtpErlangTuple tMsg = (OtpErlangTuple) msg;
            if (tMsg.arity() == 3) {
                OtpErlangObject[] o = tMsg.elements();
                if (o[0] instanceof OtpErlangAtom && ((OtpErlangAtom) o[0]).atomValue().equals("$gen_call") &&
                        o[1] instanceof OtpErlangTuple && ((OtpErlangTuple) o[1]).arity() == 2 &&
                        o[2] instanceof OtpErlangTuple && ((OtpErlangTuple) o[2]).arity() == 5) {
                    OtpErlangTuple from = (OtpErlangTuple) o[1];
                    OtpErlangTuple request = (OtpErlangTuple) o[2];

                    try {
                        return new RpcMsg(from, request);
                    } catch (IllegalArgumentException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
        
        return null;
    }
}

package net.lightpole.rpcnode;

import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangExit;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import com.ericsson.otp.erlang.OtpErlangTuple;
import com.ericsson.otp.erlang.OtpSelf;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * Usage:
 *   $ erl -sname clientnode -setcookie mycookie
 *   (clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).
 * 
 * @author Caoyuan Deng
 */
public abstract class RpcNode {

    public static final OtpErlangAtom OK = new OtpErlangAtom("ok");
    public static final OtpErlangAtom ERROR = new OtpErlangAtom("error");
    public static final OtpErlangAtom STOPED = new OtpErlangAtom("stoped");
    private static final int THREAD_POOL_SIZE = 100;
    private OtpSelf xSelf;
    private OtpConnection sConnection;
    private ExecutorService execService;

    public RpcNode(String xnodeName, String cookie) {
        this(xnodeName, cookie, THREAD_POOL_SIZE);
    }

    public RpcNode(String xnodeName, String cookie, int threadPoolSize) {
        execService = Executors.newFixedThreadPool(threadPoolSize);

        startServerConnection(xnodeName, cookie);
        loop();
    }

    private void startServerConnection(String xnodeName, String cookie) {
        try {
            xSelf = new OtpSelf(xnodeName, cookie);
            boolean registered = xSelf.publishPort();
            if (registered) {
                System.out.println(xSelf.node() + " is ready.");
                /**
                 * Accept an incoming connection from a remote node. A call to this
                 * method will block until an incoming connection is at least
                 * attempted.
                 */
                sConnection = xSelf.accept();
            } else {
                System.out.println("There should be an epmd running, start an epmd by running 'erl'.");
            }
        } catch (IOException ex) {
            Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
        } catch (OtpAuthException ex) {
            Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void loop() {
        while (true) {
            try {
                final int[] flag = {0};

                final OtpErlangTuple msg = (OtpErlangTuple) sConnection.receive();

                Runnable task = new Runnable() {

                    public void run() {
                        RpcMsg call = RpcMsg.tryToResolveRcpCall(msg);

                        if (call != null) {
                            long t0 = System.currentTimeMillis();

                            flag[0] = processRpcCall(call);

                            System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0);
                        } else {
                            try {
                                sConnection.send(sConnection.peer().node(), new OtpErlangString("unknown request"));
                            } catch (IOException ex) {
                                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
                            }
                        }
                    }
                };

                execService.execute(task);

                if (flag[0] == -1) {
                    System.out.println("Exited");
                    break;
                }

            } catch (OtpErlangExit ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IOException ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            } catch (OtpAuthException ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    protected void sendRpcResult(RpcMsg call, OtpErlangAtom head, OtpErlangObject result) throws IOException {
        OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] {head, result});

        // Should specify call.tag here
        OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[]{call.tag, tResult});
        // Should specify call.to here
        sConnection.send(call.to, msg, 1024 * 1024 * 10); 
    }

    public abstract int processRpcCall(RpcMsg call);
    

    // ------ helper
    public static String getShortLocalHost() {
        return getLocalHost(false);
    }

    public static String getLongLocalHost() {
        return getLocalHost(true);
    }

    private static String getLocalHost(boolean longName) {
        String localHost;
        try {
            localHost = InetAddress.getLocalHost().getHostName();
            if (!longName) {
                /* Make sure it's a short name, i.e. strip of everything after first '.' */
                int dot = localHost.indexOf(".");
                if (dot != -1) {
                    localHost = localHost.substring(0, dot);
                }
            }
        } catch (UnknownHostException e) {
            localHost = "localhost";
        }

        return localHost;
    }
}

As you can see, the RpcNode is an abstract class, by implement int processRpcCall(RpcMsg call), you can get your what ever wanted features. For example:

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package net.lightpole.xmlnode;

import basexnode.Main;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import java.io.IOException;
import net.lightpole.rpcnode.RpcMsg;
import net.lightpole.rpcnode.RpcNode;

/**
 *
 * @author dcaoyuan
 */
public class MyNode extends RpcNode {

    public MyNode(String xnodeName, String cookie, int threadPoolSize) {
        super(xnodeName, cookie, threadPoolSize);
    }

    @Override
    public int processRpcCall(RpcMsg call) {
        final String modStr = call.mod.atomValue();
        final String funStr = call.fun.atomValue();
        final OtpErlangList args = call.args;

        try {
            OtpErlangAtom head = ERROR;
            OtpErlangObject result = null;

            if (modStr.equals("xnode") && funStr.equals("stop")) {
                head = OK;
                sendRpcResult(call, head, STOPED);
                return -1;
            }

            if (modStr.equals("System") && funStr.equals("currentTimeMillis")) {
                head = OK;
                long t = System.currentTimeMillis();
                result = new OtpErlangLong(t);
            } else {
                result = new OtpErlangString("{undef,{" + modStr + "," + funStr + "}}");
            }

            if (result == null) {
                result = new OtpErlangAtom("undefined");
            }

            sendRpcResult(call, head, result);
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (Exception ex) {
        }

        return 0;
    }
}

I tested MyNode by:

$ erl -sname clientnode -setcookie mycookie
...
(clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).

And you can try to test its concurrent performance by:

%% $ erl -sname clientnode -setcookie mycookie
%% > xnode_test:test(10000)

-module(xnode_test).

-export([test/1]).

test(ProcN) ->
    Workers = [spawn_worker(self(), fun rpc_parse/1, {})        
     	       || I <- lists:seq(0, ProcN - 1)],
    Results = [wait_result(Worker) || Worker <- Workers].

rpc_parse({}) ->
    rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).

spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.

I spawned 10000 calls to it, and it run smoothly.

I'm also considering to write a more general-purpose RPC server in Java, which can dynamically call any existed methods of Java class.

阅读全文...
 
本站相关内容:(RSS)

RPC Server for Erlang, In Java

We are using Erlang to do some serious things, one of them is indeed part of a banking system. Erlang is a perfect language in concurrent and syntax (yes, I like its syntax), but lacks static typing (I hope new added -spec and -type attributes may be a bit helping), and, is not suitable for processing massive data (performance, memory etc). I tried parsing a 10M size XML file with xmerl, the lib for XML in OTP/Erlang, which causes terrible memory disk-swap and I can never get the parsed tree ou

RPC Server for Erlang, In Scala

BlogTrader (27 reads)

There has been Java code in my previous blog: RPC Server for Erlang, In Java, I'm now try to rewrite it in Scala. With the pattern match that I've been familiar with in Erlang, write the Scala version is really a pleasure. You can compare it with the Java version.

I do not try Scala's actor lib yet, maybe late.

And also, I should port Erlang's jinterface to Scala, since OtpErlangTuple, OtpErlangList should be written in Scala's Tuple and List.

The code is auto-formatted by NetBeans' Scala plugin, and the syntax highlighting is the same as in NetBeans, oh, not exactly.

/*
 * RpcMsg.scala
 *
 */
package net.lightpole.rpcnode

import com.ericsson.otp.erlang.{OtpErlangAtom, OtpErlangList, OtpErlangObject, OtpErlangPid, OtpErlangRef, OtpErlangTuple}

class RpcMsg(val call:OtpErlangAtom,
             val mod :OtpErlangAtom,
             val fun :OtpErlangAtom,
             val args:OtpErlangList,
             val user:OtpErlangPid,
             val to  :OtpErlangPid,
             val tag :OtpErlangRef) {
}

object RpcMsg {
   
   def apply(msg:OtpErlangObject) : Option[RpcMsg] = msg match {
      case tMsg:OtpErlangTuple =>
         tMsg.elements() match {
            /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */
            case Array(head:OtpErlangAtom, from:OtpErlangTuple, request:OtpErlangTuple) =>
               if (head.atomValue.equals("$gen_call")) {
                  (from.elements, request.elements) match {
                     case (Array(to :OtpErlangPid,
                                 tag:OtpErlangRef), Array(call:OtpErlangAtom,
                                                          mod :OtpErlangAtom,
                                                          fun :OtpErlangAtom,
                                                          args:OtpErlangList,
                                                          user:OtpErlangPid)) =>
                        if (call.atomValue.equals("call")) {
                           Some(new RpcMsg(call, mod, fun, args, user, to, tag))
                        } else None
                     case _ => None
                  }
               } else None
            case _ => None
         }
      case _ => None
   }
}

/*
 * RpcNode.scala
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
package net.lightpole.rpcnode

import com.ericsson.otp.erlang.{OtpAuthException, OtpConnection, OtpErlangAtom, OtpErlangExit, OtpErlangObject, OtpErlangString, OtpErlangTuple, OtpSelf}
import java.io.IOException
import java.net.InetAddress
import java.net.UnknownHostException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.logging.Level
import java.util.logging.Logger


trait Cons {
   val OK     = new OtpErlangAtom("ok")
   val ERROR  = new OtpErlangAtom("error")
   val STOPED = new OtpErlangAtom("stoped")
   val THREAD_POOL_SIZE = 100
}

/**
 *
 * Usage:
 *   $ erl -sname clientnode -setcookie mycookie
 *   (clientnode@cmac)> rpc:call(xnodename@cmac, xnode, parse, []).
 *
 * @author Caoyuan Deng
 */
abstract class RpcNode(xnodeName:String, cookie:String, threadPoolSize:Int) extends Cons {
   
   def this(xnodeName:String, cookie:String) = this(xnodeName, cookie, 100)
   
   private var xSelf:OtpSelf = _
   private var sConnection:OtpConnection = _
   private var execService:ExecutorService = Executors.newFixedThreadPool(threadPoolSize)

   startServerConnection(xnodeName, cookie)
   loop
    
   def startServerConnection(xnodeName:String, cookie:String ) = {
      try {
         xSelf = new OtpSelf(xnodeName, cookie);
         // The node then publishes its port to the Erlang Port Mapper Daemon.
         // This registers the node name and port, making it available to a remote client process.
         // When the port is published it is important to immediately invoke the accept method.
         // Forgetting to accept a connection after publishing the port would be the programmatic
         // equivalent of false advertising
         val registered = xSelf.publishPort();
         if (registered) {
            System.out.println(xSelf.node() + " is ready.");
            /**
             * Accept an incoming connection from a remote node. A call to this
             * method will block until an incoming connection is at least
             * attempted.
             */
            sConnection = xSelf.accept();
         } else {
            System.out.println("There should be an epmd running, start an epmd by running 'erl'.");
         }
      } catch {
         case ex:IOException =>
         case ex:OtpAuthException =>
      }
   }

   def loop : Unit = {
      try {
         val flag = Array(0)
         val msg = sConnection.receive
            
         val task = new Runnable() {
            override
            def run = RpcMsg(msg) match {
               case None =>
                  try {
                     sConnection.send(sConnection.peer.node, new OtpErlangString("unknown request"));
                  } catch {
                     case ex:IOException =>
                  }
               case Some(call) =>
                  val t0 = System.currentTimeMillis

                  flag(0) = processRpcCall(call)

                  System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0)
            }
         }

         execService.execute(task)

         if (flag(0) == -1) {
            System.out.println("Exited")
         } else loop
         
      } catch {
         case ex:IOException => loop
         case ex:OtpErlangExit =>
         case ex:OtpAuthException =>
      }
   }

   /** @throws IOException */
   def sendRpcResult(call:RpcMsg, head:OtpErlangAtom, result:OtpErlangObject) = {
      val tResult = new OtpErlangTuple(Array(head, result))

      // Should specify call.tag here
      val msg = new OtpErlangTuple(Array(call.tag, tResult))
      // Should specify call.to here
      sConnection.send(call.to, msg, 1024 * 1024 * 10)
   }

   /** @abstact */
   def processRpcCall(call:RpcMsg) : Int
}

object RpcCall {   
   def getShortLocalHost : String = getLocalHost(false)

   def getLongLocalHost : String = getLocalHost(true)

   def getLocalHost(longName:Boolean) : String = {
      var localHost = "localhost"
      try {
         localHost = InetAddress.getLocalHost.getHostName;
         if (!longName) {
            /* Make sure it's a short name, i.e. strip of everything after first '.' */
            val dot = localHost.indexOf(".")
            if (dot != -1) localHost = localHost.substring(0, dot)
         }
      } catch {
         case ex:UnknownHostException =>
      }

      localHost
   }
}

Erlang Server For Stomp

rajd's Java blog

So Jeff Xiong has just anounced the Stomperl project - an Erlang broker that uses the stomp protocol. Stomp is a simple text oriented wire format for messaging - which is really starting to get a lot of traction because of its so easy to implement new clients with.  Existing clients include:C, C++, C#, Java, Delphi, Flash, Ruby, Smalltalk, Perl, PHP etc. This brings the number of stomp
互联网相关内容:
RPC Server for Erlang, In Java (2008年10月12日)
RPC Server for Erlang, In Scala (2008年10月28日)
[erlang-questions] rpc is bad? (2008年05月23日)
Erlang Server For Stomp (2007年12月12日)
Erlang: A Generic Server Tutorial | 20bits (2008年06月09日)
ASPN : Python Cookbook : JSON RPC Server and client (2008年04月02日)
Erlang (2007年09月19日)
Erlang (2007年12月19日)