UNIVERSITÀ DEGLI STUDI DELLA CALABRIA
FACOLTÀ DI INGEGNERIA
CORSO DI LAUREA SPECIALISTICA IN INGEGNERIA INFORMATICA
Esame di SISTEMI DISTRIBUITI
APPELLO DEL 27 GIUGNO 2006: SOLUZIONE
Interfaccia Job
import java.io.*;
public interface Job extends Serializable{
Object run(Object parameters);
String getType();
}
Interfaccia WorkFlowManager
import java.rmi.*;
import java.util.Vector;
import java.net.InetAddress;
public interface WorkFlowManager extends Remote{
public void executeWorkFlow(Vector workFlow,
InetAddress ipClient) throws RemoteException;
public void startJob(long idWF,Object output)
throws RemoteException;
public void registerExecutor(InetAddress ipExecutor,
String type) throws RemoteException;
}
Interfaccia ExecutorNode
import java.rmi.*;
import java.net.InetAddress;
public interface ExecutorNode extends Remote{
public void setup(Job j, InetAddress ipNext, long ID)
throws RemoteException;
public void startJob(long idWF,Object output) throws
RemoteException;
}
Interfaccia ClientNode
import java.rmi.*;
public interface ClientNode extends Remote{
public void receiveResult(Object obj)
throws RemoteException;
}
Implementazione del WorkFlowManager
import
import
import
import
java.rmi.*;
java.rmi.server.*;
java.util.*;
java.net.InetAddress;
public class WorkFlowManagerImpl extends
UnicastRemoteObject implements WorkFlowManager{
private
private
private
private
Vector NodeA=null;
Vector NodeB=null;
Vector NodeC=null;
HashMap hm=null;
private long ID;
public WorkFlowManagerImpl()throws RemoteException {
super();
this.NodeA=new Vector();
this.NodeB=new Vector();
this.NodeC=new Vector();
this.hm=new HashMap();
this.ID=0;
}
public synchronized void executeWorkFlow(Vector
workFlow,InetAddress ipClient) throws RemoteException {
ID++;
hm.put(ID,ipClient);
int dimWorkFlow = workFlow.size();
InetAddress [] executor=new
InetAddress[dimWorkFlow];
for (int i=0;i<dimWorkFlow; i++){
Job job=(Job)workFlow.get(i);
InetAddress nodeAddress=selectNode(job);
executor[i]=nodeAddress;
}
// fase di setup
for (int i=0;i<dimWorkFlow; i++){
}
try{
ExecutorNode execNode =
(ExecutorNode)Naming.lookup(
"rmi://"+executor[i].getHostName()+
":1099/Executor");
if (i<dimWorkFlow-1)
execNode.setup(
(Job)workFlow.get(i),executor[i+1], ID);
else
execNode.setup((Job)workFlow.get(i),
InetAddress.getLocalHost(), ID);
}catch(Exception e){}
// thread che chiama il primo nodo del workflow
Starter s= new Starter(executor[0],ID);
s.start();
}
// chiamato dall'ultimo nodo del workflow
public synchronized void startJob(long idWF,
Object output) throws RemoteException {
try{
InetAddress client= (InetAddress) hm.get(idWF);
ClientNode c = (ClientNode) Naming.lookup
("rmi://"+client.getHostName()+
":1099/ClientNode");
c.receiveResult(output);
}catch(Exception e){}
}
public synchronized void registerExecutor(
InetAddress ipExecutor, String type)
throws RemoteException {
if (type.equals("A")) NodeA.add(ipExecutor);
else if (type.equals("B")) NodeB.add(ipExecutor);
else NodeC.add(ipExecutor);
}
private class Starter extends Thread{
private long ID;
private InetAddress firstNode=null;
public Starter(InetAddress firstNode,long ID){
this.ID=ID;
this.firstNode=firstNode;
}
public void run(){
try{
ExecutorNode execNode=
(ExecutorNode)Naming.lookup
("rmi://"+firstNode.getHostName()+
":1099/Executor");
execNode.startJob(ID,null);
}catch(Exception e){}
}
} // end Starter
private InetAddress selectNode(Job j){
InetAddress node=null;
if (j.getType().equals("A")) {
Random r=new Random();
int index=r.nextInt(NodeA.size());
node=(InetAddress)NodeA.get(index);
}
if (j.getType().equals("B")) {
Random r=new Random();
int index=r.nextInt(NodeA.size());
node=(InetAddress)NodeB.get(index);
}
if (j.getType().equals("C")) {
Random r=new Random();
int index=r.nextInt(NodeA.size());
node=(InetAddress)NodeC.get(index);
}
return node;
}
}// end
Implementazione del ExecutorNode
import
import
import
import
java.rmi.*;
java.rmi.server.*;
java.net.InetAddress;
java.util.*;
public class ExecutorNodeImpl extends
UnicastRemoteObject implements ExecutorNode{
private InetAddress managerIp=null;
private String type=null;
private HashMap hm=null;
public ExecutorNodeImpl(InetAddress managerIp,
String type) throws RemoteException{
super();
this.managerIp=managerIp;
this.type=type;
}
this.hm=new HashMap();
public synchronized void setup(Job j,
InetAddress ipNext, long ID)
throws RemoteException {
Vector v=new Vector();
v.add(j);
v.add(ipNext);
hm.put(ID,v);
}
public synchronized void startJob(long ID,
Object output) throws RemoteException {
Vector v= (Vector)hm.get(ID);
Executor exe=
new Executor((Job)v.get(0), (InetAddress)v.get(1),
output, ID);
}
exe.start();
private class Executor extends Thread{
Job j=null;
InetAddress ipNext=null;
Object output=null;
long ID;
public Executor(Job j, InetAddress ipNext,
Object output, long ID){
this.j=j;
this.ipNext=ipNext;
this.output=output;
this.ID=ID;
}
public void run(){
Object out = j.run(output);
try{
if (!ipNext.equals(managerIp)){
ExecutorNode nextExecutor=
(ExecutorNode) Naming.lookup(
"rmi://"+ipNext.getHostName()+
":1099/Executor");
nextExecutor.startJob(ID,out);
}
}
}
else{
WorkFlowManager wfm = (WorkFlowManager)
Naming.lookup(
"rmi://"+managerIp.getHostName()+
":1099/WorkFlowManager");
wfm.startJob(ID,out);
}
}catch(Exception e){}
}// end
Implementazione del ClientNode
import java.rmi.*;
import java.rmi.server.*;
import java.net.InetAddress;
public class ClientNodeImpl extends
UnicastRemoteObject implements ClientNode{
private InetAddress managerIp=null;
private Object res=null;
public ClientNodeImpl(InetAddress managerIp)
throws RemoteException{
this.managerIp=managerIp;
}
public void receiveResult(Object obj)
throws RemoteException{
res=obj;
}
}
public void buildWorkFlow(){
// costruisce un'istanza di workflow
// ed invia la richiesta al WorkFlowManager
}