从下面的例子中可以看到,Consumer(client)的代码中引用了Provider部分的class,本例中是
com.provider.EchoServiceImpl和com.provider.EchoService
即这些class在Consumer(client)和Provider(server)都需要部署
Provider:
package com.provider; public interface EchoService { String echo(String msg); }
package com.provider; public class EchoServiceImpl implements EchoService { @Override public String echo(String msg) { return msg != null ? msg + "-->I am OK." : " pass null."; } }
package com.provider; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class RpcProvider { private static Executor executor = Executors.newFixedThreadPool(20); public static void provide(String host, int port) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(host, port)); try { while (true) { System.out.println("one input coming"); executor.execute(new ProviderTask(serverSocket.accept())); System.out.println("execute one input"); } } finally { serverSocket.close(); } } private static class ProviderTask implements Runnable { private Socket socket; public ProviderTask(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; try { inputStream = new ObjectInputStream(socket.getInputStream()); String className = inputStream.readUTF(); Class<?> service = Class.forName(className); String methodName = inputStream.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) inputStream.readObject(); Object[] arguments = (Object[]) inputStream.readObject(); Method method = service.getMethod(methodName, parameterTypes); Object result = method.invoke(service.newInstance(), arguments); outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(result); } catch (Exception e) { e.printStackTrace(); } finally { close(inputStream); close(outputStream); close(socket); } } } public static void close(AutoCloseable closeable) { if (closeable != null) { try { closeable.close(); } catch (Exception e) { e.printStackTrace(); } } } }
package com.provider; import java.io.IOException; public class ProviderMain { public static void main(String[] args) { try { RpcProvider.provide("localhost", 8088); } catch (IOException e) { e.printStackTrace(); } } }
Consumer:
package com.consumer; import com.provider.RpcProvider; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; public class RpcConsumer { public Object consume(Class<?> echoServiceClass, InetSocketAddress socketAddress) { return Proxy.newProxyInstance(echoServiceClass.getClassLoader(), new Class<?>[]{echoServiceClass.getInterfaces()[0]}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream outputStream = null; ObjectInputStream inputStream = null; try { socket = new Socket(); socket.connect(socketAddress); outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeUTF(echoServiceClass.getName()); outputStream.writeUTF(method.getName()); outputStream.writeObject(method.getParameterTypes()); outputStream.writeObject(args); inputStream = new ObjectInputStream(socket.getInputStream()); return inputStream.readObject(); } finally { RpcProvider.close(outputStream); RpcProvider.close(inputStream); RpcProvider.close(socket); } } }); } }
package com.consumer; import com.provider.EchoService; import com.provider.EchoServiceImpl; import java.net.InetSocketAddress; public class ConsumerMain { public static void main(String[] args) { RpcConsumer rpcConsumer = new RpcConsumer(); EchoService echoService = (EchoService) rpcConsumer.consume(EchoServiceImpl.class, new InetSocketAddress("localhost", 8088)); System.out.println(echoService.echo(ConsumerMain.class.getCanonicalName())); } }
时间: 2024-10-29 17:36:55