UNIVERSITÀ DEGLI STUDI DELLA CALABRIA FACOLTÀ DI INGEGNERIA CORSO DI LAUREA SPECIALISTICA IN INGEGNERIA INFORMATICA Esame di SISTEMI DISTRIBUITI APPELLO DEL 27 GIUGNO 2006: SOLUZIONE Interfaccia Job import java.io.*; public interface Job extends Serializable{ Object run(Object parameters); String getType(); } Interfaccia WorkFlowManager import java.rmi.*; import java.util.Vector; import java.net.InetAddress; public interface WorkFlowManager extends Remote{ public void executeWorkFlow(Vector workFlow, InetAddress ipClient) throws RemoteException; public void startJob(long idWF,Object output) throws RemoteException; public void registerExecutor(InetAddress ipExecutor, String type) throws RemoteException; } Interfaccia ExecutorNode import java.rmi.*; import java.net.InetAddress; public interface ExecutorNode extends Remote{ public void setup(Job j, InetAddress ipNext, long ID) throws RemoteException; public void startJob(long idWF,Object output) throws RemoteException; } Interfaccia ClientNode import java.rmi.*; public interface ClientNode extends Remote{ public void receiveResult(Object obj) throws RemoteException; } Implementazione del WorkFlowManager import import import import java.rmi.*; java.rmi.server.*; java.util.*; java.net.InetAddress; public class WorkFlowManagerImpl extends UnicastRemoteObject implements WorkFlowManager{ private private private private Vector NodeA=null; Vector NodeB=null; Vector NodeC=null; HashMap hm=null; private long ID; public WorkFlowManagerImpl()throws RemoteException { super(); this.NodeA=new Vector(); this.NodeB=new Vector(); this.NodeC=new Vector(); this.hm=new HashMap(); this.ID=0; } public synchronized void executeWorkFlow(Vector workFlow,InetAddress ipClient) throws RemoteException { ID++; hm.put(ID,ipClient); int dimWorkFlow = workFlow.size(); InetAddress [] executor=new InetAddress[dimWorkFlow]; for (int i=0;i<dimWorkFlow; i++){ Job job=(Job)workFlow.get(i); InetAddress nodeAddress=selectNode(job); executor[i]=nodeAddress; } // fase di setup for (int i=0;i<dimWorkFlow; i++){ } try{ ExecutorNode execNode = (ExecutorNode)Naming.lookup( "rmi://"+executor[i].getHostName()+ ":1099/Executor"); if (i<dimWorkFlow-1) execNode.setup( (Job)workFlow.get(i),executor[i+1], ID); else execNode.setup((Job)workFlow.get(i), InetAddress.getLocalHost(), ID); }catch(Exception e){} // thread che chiama il primo nodo del workflow Starter s= new Starter(executor[0],ID); s.start(); } // chiamato dall'ultimo nodo del workflow public synchronized void startJob(long idWF, Object output) throws RemoteException { try{ InetAddress client= (InetAddress) hm.get(idWF); ClientNode c = (ClientNode) Naming.lookup ("rmi://"+client.getHostName()+ ":1099/ClientNode"); c.receiveResult(output); }catch(Exception e){} } public synchronized void registerExecutor( InetAddress ipExecutor, String type) throws RemoteException { if (type.equals("A")) NodeA.add(ipExecutor); else if (type.equals("B")) NodeB.add(ipExecutor); else NodeC.add(ipExecutor); } private class Starter extends Thread{ private long ID; private InetAddress firstNode=null; public Starter(InetAddress firstNode,long ID){ this.ID=ID; this.firstNode=firstNode; } public void run(){ try{ ExecutorNode execNode= (ExecutorNode)Naming.lookup ("rmi://"+firstNode.getHostName()+ ":1099/Executor"); execNode.startJob(ID,null); }catch(Exception e){} } } // end Starter private InetAddress selectNode(Job j){ InetAddress node=null; if (j.getType().equals("A")) { Random r=new Random(); int index=r.nextInt(NodeA.size()); node=(InetAddress)NodeA.get(index); } if (j.getType().equals("B")) { Random r=new Random(); int index=r.nextInt(NodeA.size()); node=(InetAddress)NodeB.get(index); } if (j.getType().equals("C")) { Random r=new Random(); int index=r.nextInt(NodeA.size()); node=(InetAddress)NodeC.get(index); } return node; } }// end Implementazione del ExecutorNode import import import import java.rmi.*; java.rmi.server.*; java.net.InetAddress; java.util.*; public class ExecutorNodeImpl extends UnicastRemoteObject implements ExecutorNode{ private InetAddress managerIp=null; private String type=null; private HashMap hm=null; public ExecutorNodeImpl(InetAddress managerIp, String type) throws RemoteException{ super(); this.managerIp=managerIp; this.type=type; } this.hm=new HashMap(); public synchronized void setup(Job j, InetAddress ipNext, long ID) throws RemoteException { Vector v=new Vector(); v.add(j); v.add(ipNext); hm.put(ID,v); } public synchronized void startJob(long ID, Object output) throws RemoteException { Vector v= (Vector)hm.get(ID); Executor exe= new Executor((Job)v.get(0), (InetAddress)v.get(1), output, ID); } exe.start(); private class Executor extends Thread{ Job j=null; InetAddress ipNext=null; Object output=null; long ID; public Executor(Job j, InetAddress ipNext, Object output, long ID){ this.j=j; this.ipNext=ipNext; this.output=output; this.ID=ID; } public void run(){ Object out = j.run(output); try{ if (!ipNext.equals(managerIp)){ ExecutorNode nextExecutor= (ExecutorNode) Naming.lookup( "rmi://"+ipNext.getHostName()+ ":1099/Executor"); nextExecutor.startJob(ID,out); } } } else{ WorkFlowManager wfm = (WorkFlowManager) Naming.lookup( "rmi://"+managerIp.getHostName()+ ":1099/WorkFlowManager"); wfm.startJob(ID,out); } }catch(Exception e){} }// end Implementazione del ClientNode import java.rmi.*; import java.rmi.server.*; import java.net.InetAddress; public class ClientNodeImpl extends UnicastRemoteObject implements ClientNode{ private InetAddress managerIp=null; private Object res=null; public ClientNodeImpl(InetAddress managerIp) throws RemoteException{ this.managerIp=managerIp; } public void receiveResult(Object obj) throws RemoteException{ res=obj; } } public void buildWorkFlow(){ // costruisce un'istanza di workflow // ed invia la richiesta al WorkFlowManager }