UNIVERSITÀ DEGLI STUDI DELLA CALABRIA FACOLTÀ DI

UNIVERSITÀ DEGLI STUDI DELLA CALABRIA
FACOLTÀ DI INGEGNERIA
CORSO DI LAUREA SPECIALISTICA IN INGEGNERIA INFORMATICA
Esame di SISTEMI DISTRIBUITI
APPELLO DEL 27 GIUGNO 2006: SOLUZIONE
Interfaccia Job
import java.io.*;
public interface Job extends Serializable{
Object run(Object parameters);
String getType();
}
Interfaccia WorkFlowManager
import java.rmi.*;
import java.util.Vector;
import java.net.InetAddress;
public interface WorkFlowManager extends Remote{
public void executeWorkFlow(Vector workFlow,
InetAddress ipClient) throws RemoteException;
public void startJob(long idWF,Object output)
throws RemoteException;
public void registerExecutor(InetAddress ipExecutor,
String type) throws RemoteException;
}
Interfaccia ExecutorNode
import java.rmi.*;
import java.net.InetAddress;
public interface ExecutorNode extends Remote{
public void setup(Job j, InetAddress ipNext, long ID)
throws RemoteException;
public void startJob(long idWF,Object output) throws
RemoteException;
}
Interfaccia ClientNode
import java.rmi.*;
public interface ClientNode extends Remote{
public void receiveResult(Object obj)
throws RemoteException;
}
Implementazione del WorkFlowManager
import
import
import
import
java.rmi.*;
java.rmi.server.*;
java.util.*;
java.net.InetAddress;
public class WorkFlowManagerImpl extends
UnicastRemoteObject implements WorkFlowManager{
private
private
private
private
Vector NodeA=null;
Vector NodeB=null;
Vector NodeC=null;
HashMap hm=null;
private long ID;
public WorkFlowManagerImpl()throws RemoteException {
super();
this.NodeA=new Vector();
this.NodeB=new Vector();
this.NodeC=new Vector();
this.hm=new HashMap();
this.ID=0;
}
public synchronized void executeWorkFlow(Vector
workFlow,InetAddress ipClient) throws RemoteException {
ID++;
hm.put(ID,ipClient);
int dimWorkFlow = workFlow.size();
InetAddress [] executor=new
InetAddress[dimWorkFlow];
for (int i=0;i<dimWorkFlow; i++){
Job job=(Job)workFlow.get(i);
InetAddress nodeAddress=selectNode(job);
executor[i]=nodeAddress;
}
// fase di setup
for (int i=0;i<dimWorkFlow; i++){
}
try{
ExecutorNode execNode =
(ExecutorNode)Naming.lookup(
"rmi://"+executor[i].getHostName()+
":1099/Executor");
if (i<dimWorkFlow-1)
execNode.setup(
(Job)workFlow.get(i),executor[i+1], ID);
else
execNode.setup((Job)workFlow.get(i),
InetAddress.getLocalHost(), ID);
}catch(Exception e){}
// thread che chiama il primo nodo del workflow
Starter s= new Starter(executor[0],ID);
s.start();
}
// chiamato dall'ultimo nodo del workflow
public synchronized void startJob(long idWF,
Object output) throws RemoteException {
try{
InetAddress client= (InetAddress) hm.get(idWF);
ClientNode c = (ClientNode) Naming.lookup
("rmi://"+client.getHostName()+
":1099/ClientNode");
c.receiveResult(output);
}catch(Exception e){}
}
public synchronized void registerExecutor(
InetAddress ipExecutor, String type)
throws RemoteException {
if (type.equals("A")) NodeA.add(ipExecutor);
else if (type.equals("B")) NodeB.add(ipExecutor);
else NodeC.add(ipExecutor);
}
private class Starter extends Thread{
private long ID;
private InetAddress firstNode=null;
public Starter(InetAddress firstNode,long ID){
this.ID=ID;
this.firstNode=firstNode;
}
public void run(){
try{
ExecutorNode execNode=
(ExecutorNode)Naming.lookup
("rmi://"+firstNode.getHostName()+
":1099/Executor");
execNode.startJob(ID,null);
}catch(Exception e){}
}
} // end Starter
private InetAddress selectNode(Job j){
InetAddress node=null;
if (j.getType().equals("A")) {
Random r=new Random();
int index=r.nextInt(NodeA.size());
node=(InetAddress)NodeA.get(index);
}
if (j.getType().equals("B")) {
Random r=new Random();
int index=r.nextInt(NodeA.size());
node=(InetAddress)NodeB.get(index);
}
if (j.getType().equals("C")) {
Random r=new Random();
int index=r.nextInt(NodeA.size());
node=(InetAddress)NodeC.get(index);
}
return node;
}
}// end
Implementazione del ExecutorNode
import
import
import
import
java.rmi.*;
java.rmi.server.*;
java.net.InetAddress;
java.util.*;
public class ExecutorNodeImpl extends
UnicastRemoteObject implements ExecutorNode{
private InetAddress managerIp=null;
private String type=null;
private HashMap hm=null;
public ExecutorNodeImpl(InetAddress managerIp,
String type) throws RemoteException{
super();
this.managerIp=managerIp;
this.type=type;
}
this.hm=new HashMap();
public synchronized void setup(Job j,
InetAddress ipNext, long ID)
throws RemoteException {
Vector v=new Vector();
v.add(j);
v.add(ipNext);
hm.put(ID,v);
}
public synchronized void startJob(long ID,
Object output) throws RemoteException {
Vector v= (Vector)hm.get(ID);
Executor exe=
new Executor((Job)v.get(0), (InetAddress)v.get(1),
output, ID);
}
exe.start();
private class Executor extends Thread{
Job j=null;
InetAddress ipNext=null;
Object output=null;
long ID;
public Executor(Job j, InetAddress ipNext,
Object output, long ID){
this.j=j;
this.ipNext=ipNext;
this.output=output;
this.ID=ID;
}
public void run(){
Object out = j.run(output);
try{
if (!ipNext.equals(managerIp)){
ExecutorNode nextExecutor=
(ExecutorNode) Naming.lookup(
"rmi://"+ipNext.getHostName()+
":1099/Executor");
nextExecutor.startJob(ID,out);
}
}
}
else{
WorkFlowManager wfm = (WorkFlowManager)
Naming.lookup(
"rmi://"+managerIp.getHostName()+
":1099/WorkFlowManager");
wfm.startJob(ID,out);
}
}catch(Exception e){}
}// end
Implementazione del ClientNode
import java.rmi.*;
import java.rmi.server.*;
import java.net.InetAddress;
public class ClientNodeImpl extends
UnicastRemoteObject implements ClientNode{
private InetAddress managerIp=null;
private Object res=null;
public ClientNodeImpl(InetAddress managerIp)
throws RemoteException{
this.managerIp=managerIp;
}
public void receiveResult(Object obj)
throws RemoteException{
res=obj;
}
}
public void buildWorkFlow(){
// costruisce un'istanza di workflow
// ed invia la richiesta al WorkFlowManager
}