首页  

Dubbo中的netty使用     所属分类 dubbo 浏览量 1119
Consumer Provider

消费者使用 NettyClient,提供者使用 NettyServer


Consumer端 com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.getClients(URL) private ExchangeClient[] getClients(URL url){ //是否共享连接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,则共享连接,否则每服务每连接 if (connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; } com.alibaba.dubbo.remoting.transport.netty.NettyClient com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel String result = helloService.hello("dyyx"); com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(Object, int) RpcInvocation [methodName=hello, parameterTypes=[class java.lang.String], arguments=[dyyx], attachments={path=dyyx.HelloService, interface=dyyx.HelloService, version=1.0.0, group=group1}] com.alibaba.dubbo.remoting.transport.DecodeHandler.decode(Object) 结果异步转同步处理 com.alibaba.dubbo.remoting.exchange.support.DefaultFuture public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } } Thread [main] (Suspended (breakpoint at line 102 in HeaderExchangeChannel)) HeaderExchangeChannel.request(Object, int) line: 102 HeaderExchangeClient.request(Object, int) line: 91 ReferenceCountExchangeClient.request(Object, int) line: 81 DubboInvoker<T>.doInvoke(Invocation) line: 96 DubboInvoker<T>(AbstractInvoker<T>).invoke(Invocation) line: 144 ListenerInvokerWrapper<T>.invoke(Invocation) line: 74 MonitorFilter.invoke(Invoker<?>, Invocation) line: 75 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 FutureFilter.invoke(Invoker<?>, Invocation) line: 53 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 ConsumerContextFilter.invoke(Invoker<?>, Invocation) line: 48 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 InvokerInvocationHandler.invoke(Object, Method, Object[]) line: 52 proxy0.hello(String) line: not available ClientMain2.main(String[]) line: 24 结果异步转同步处理 Daemon Thread [DubboClientHandler-127.0.0.1:20880-thread-1] (Suspended (breakpoint at line 256 in DefaultFuture)) DefaultFuture.doReceived(Response) line: 256 DefaultFuture.received(Channel, Response) line: 240 HeaderExchangeHandler.handleResponse(Channel, Response) line: 96 HeaderExchangeHandler.received(Channel, Object) line: 177 DecodeHandler.received(Channel, Object) line: 52 ChannelEventRunnable.run() line: 82 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142 ThreadPoolExecutor$Worker.run() line: 617 Thread.run() line: 745
Provider 端 com.alibaba.dubbo.remoting.transport.DecodeHandler.received(Channel, Object) com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(Channel, Object) Request [id=7, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=hello, parameterTypes=[class java.lang.String], arguments=[dyyx], attachments={path=dyyx.HelloService, input=168, dubbo=2.8.4, interface=dyyx.HelloService, version=1.0.0, timeout=500, group=group1}]] data=RpcInvocation [methodName=hello, parameterTypes=[class java.lang.String], arguments=[dyyx], attachments={path=dyyx.HelloService, input=168, dubbo=2.8.4, interface=dyyx.HelloService, version=1.0.0, timeout=500, group=group1} com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(ExchangeChannel, Request) 发送响应结果 com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent) [id: 0xc89e46a2, /192.168.1.102:50409 => /192.168.1.102:20880] WRITE: Response [id=31, version=2.0.0, status=20, event=false, error=null, result=RpcResult [result=hello,dyyx,2021-01-10T11:12:44.071, exception=null]] Daemon Thread [DubboServerHandler-192.168.1.102:20880-thread-4] (Suspended (breakpoint at line 31 in HelloServiceImpl)) HelloServiceImpl.hello(String) line: 31 Wrapper1.invokeMethod(Object, String, Class[], Object[]) line: not available JavassistProxyFactory$1.doInvoke(T, String, Class<?>[], Object[]) line: 46 JavassistProxyFactory$1(AbstractProxyInvoker<T>).invoke(Invocation) line: 72 RegistryProtocol$InvokerDelegete<T>(InvokerWrapper<T>).invoke(Invocation) line: 53 ExceptionFilter.invoke(Invoker<?>, Invocation) line: 64 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 MonitorFilter.invoke(Invoker<?>, Invocation) line: 65 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 TimeoutFilter.invoke(Invoker<?>, Invocation) line: 42 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 TraceFilter.invoke(Invoker<?>, Invocation) line: 78 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 ContextFilter.invoke(Invoker<?>, Invocation) line: 70 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 GenericFilter.invoke(Invoker<?>, Invocation) line: 132 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 ClassLoaderFilter.invoke(Invoker<?>, Invocation) line: 38 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 EchoFilter.invoke(Invoker<?>, Invocation) line: 38 ProtocolFilterWrapper$1.invoke(Invocation) line: 91 DubboProtocol$1.reply(ExchangeChannel, Object) line: 113 HeaderExchangeHandler.handleRequest(ExchangeChannel, Request) line: 84 HeaderExchangeHandler.received(Channel, Object) line: 170 DecodeHandler.received(Channel, Object) line: 52 ChannelEventRunnable.run() line: 82 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142 ThreadPoolExecutor$Worker.run() line: 617 Thread.run() line: 745 Daemon Thread [DubboServerHandler-192.168.1.102:20880-thread-10] (Suspended (breakpoint at line 102 in NettyHandler)) NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent) line: 102 NettyHandler(SimpleChannelHandler).handleDownstream(ChannelHandlerContext, ChannelEvent) line: 254 DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 591 DefaultChannelPipeline.sendDownstream(ChannelEvent) line: 582 Channels.write(Channel, Object, SocketAddress) line: 704 Channels.write(Channel, Object) line: 671 NioAcceptedSocketChannel(AbstractChannel).write(Object) line: 248 NettyChannel.send(Object, boolean) line: 98 NettyChannel(AbstractPeer).send(Object) line: 51 HeaderExchangeHandler.received(Channel, Object) line: 171 DecodeHandler.received(Channel, Object) line: 52 ChannelEventRunnable.run() line: 82 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142 ThreadPoolExecutor$Worker.run() line: 617 Thread.run() line: 745

上一篇     下一篇
freemarker实例

dubbo消费端独立启动不依赖spring

项目经理要做的事情

中年男人的西游记

dubbo XML Schema 扩展机制

基于netty的 简单 rpc 框架