Dubbo中的netty使用
所属分类 dubbo
浏览量 1146
Consumer Provider
消费者使用 NettyClient,提供者使用 NettyServer
Consumer端
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.getClients(URL)
private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect){
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
com.alibaba.dubbo.remoting.transport.netty.NettyClient
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel
String result = helloService.hello("dyyx");
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(Object, int)
RpcInvocation [methodName=hello, parameterTypes=[class java.lang.String], arguments=[dyyx], attachments={path=dyyx.HelloService, interface=dyyx.HelloService, version=1.0.0, group=group1}]
com.alibaba.dubbo.remoting.transport.DecodeHandler.decode(Object)
结果异步转同步处理
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
Thread [main] (Suspended (breakpoint at line 102 in HeaderExchangeChannel))
HeaderExchangeChannel.request(Object, int) line: 102
HeaderExchangeClient.request(Object, int) line: 91
ReferenceCountExchangeClient.request(Object, int) line: 81
DubboInvoker.doInvoke(Invocation) line: 96
DubboInvoker(AbstractInvoker).invoke(Invocation) line: 144
ListenerInvokerWrapper.invoke(Invocation) line: 74
MonitorFilter.invoke(Invoker>, Invocation) line: 75
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
FutureFilter.invoke(Invoker>, Invocation) line: 53
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
ConsumerContextFilter.invoke(Invoker>, Invocation) line: 48
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
InvokerInvocationHandler.invoke(Object, Method, Object[]) line: 52
proxy0.hello(String) line: not available
ClientMain2.main(String[]) line: 24
结果异步转同步处理
Daemon Thread [DubboClientHandler-127.0.0.1:20880-thread-1] (Suspended (breakpoint at line 256 in DefaultFuture))
DefaultFuture.doReceived(Response) line: 256
DefaultFuture.received(Channel, Response) line: 240
HeaderExchangeHandler.handleResponse(Channel, Response) line: 96
HeaderExchangeHandler.received(Channel, Object) line: 177
DecodeHandler.received(Channel, Object) line: 52
ChannelEventRunnable.run() line: 82
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142
ThreadPoolExecutor$Worker.run() line: 617
Thread.run() line: 745
Provider 端
com.alibaba.dubbo.remoting.transport.DecodeHandler.received(Channel, Object)
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(Channel, Object)
Request [id=7, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=hello, parameterTypes=[class java.lang.String], arguments=[dyyx], attachments={path=dyyx.HelloService, input=168, dubbo=2.8.4, interface=dyyx.HelloService, version=1.0.0, timeout=500, group=group1}]]
data=RpcInvocation [methodName=hello, parameterTypes=[class java.lang.String], arguments=[dyyx], attachments={path=dyyx.HelloService, input=168, dubbo=2.8.4, interface=dyyx.HelloService, version=1.0.0, timeout=500, group=group1}
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(ExchangeChannel, Request)
发送响应结果
com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent)
[id: 0xc89e46a2, /192.168.1.102:50409 => /192.168.1.102:20880] WRITE: Response [id=31, version=2.0.0, status=20, event=false, error=null, result=RpcResult [result=hello,dyyx,2021-01-10T11:12:44.071, exception=null]]
Daemon Thread [DubboServerHandler-192.168.1.102:20880-thread-4] (Suspended (breakpoint at line 31 in HelloServiceImpl))
HelloServiceImpl.hello(String) line: 31
Wrapper1.invokeMethod(Object, String, Class[], Object[]) line: not available
JavassistProxyFactory$1.doInvoke(T, String, Class>[], Object[]) line: 46
JavassistProxyFactory$1(AbstractProxyInvoker).invoke(Invocation) line: 72
RegistryProtocol$InvokerDelegete(InvokerWrapper).invoke(Invocation) line: 53
ExceptionFilter.invoke(Invoker>, Invocation) line: 64
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
MonitorFilter.invoke(Invoker>, Invocation) line: 65
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
TimeoutFilter.invoke(Invoker>, Invocation) line: 42
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
TraceFilter.invoke(Invoker>, Invocation) line: 78
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
ContextFilter.invoke(Invoker>, Invocation) line: 70
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
GenericFilter.invoke(Invoker>, Invocation) line: 132
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
ClassLoaderFilter.invoke(Invoker>, Invocation) line: 38
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
EchoFilter.invoke(Invoker>, Invocation) line: 38
ProtocolFilterWrapper$1.invoke(Invocation) line: 91
DubboProtocol$1.reply(ExchangeChannel, Object) line: 113
HeaderExchangeHandler.handleRequest(ExchangeChannel, Request) line: 84
HeaderExchangeHandler.received(Channel, Object) line: 170
DecodeHandler.received(Channel, Object) line: 52
ChannelEventRunnable.run() line: 82
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142
ThreadPoolExecutor$Worker.run() line: 617
Thread.run() line: 745
Daemon Thread [DubboServerHandler-192.168.1.102:20880-thread-10] (Suspended (breakpoint at line 102 in NettyHandler))
NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent) line: 102
NettyHandler(SimpleChannelHandler).handleDownstream(ChannelHandlerContext, ChannelEvent) line: 254
DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 591
DefaultChannelPipeline.sendDownstream(ChannelEvent) line: 582
Channels.write(Channel, Object, SocketAddress) line: 704
Channels.write(Channel, Object) line: 671
NioAcceptedSocketChannel(AbstractChannel).write(Object) line: 248
NettyChannel.send(Object, boolean) line: 98
NettyChannel(AbstractPeer).send(Object) line: 51
HeaderExchangeHandler.received(Channel, Object) line: 171
DecodeHandler.received(Channel, Object) line: 52
ChannelEventRunnable.run() line: 82
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142
ThreadPoolExecutor$Worker.run() line: 617
Thread.run() line: 745
上一篇
下一篇
freemarker实例
dubbo消费端独立启动不依赖spring
项目经理要做的事情
中年男人的西游记
dubbo XML Schema 扩展机制
基于netty的 简单 rpc 框架