Dubbo泛化引用
泛化调用与泛化实现
Dubbo泛化有三种不同的序列化方式:
generic = true:对参数使用 PojoUtils 进行序列化 generic = bean : 对参数使用 JavaBean 方式进行序列化 generic = nativejava:对参数使用 nativejava 方式进行序列化 泛化接口调用方式主要在服务消费端没有API接口类及模型类元(比如入参和出参的POJO 类)的情况下使用,其参数及返回值中没有对应的POJO 类,所以所有POJO参数均转换为Map表示。使用泛化调用时,服务消费模块不再需要引入SDK二方包。
泛化调用
泛化调用 :要在服务消费端没有API接口类及模型类元(比如入参和出参的POJO 类)的情况下使用。在进行服务调用时相关参数通过 Map 形式将数据传递,由服务提供者将 Map 转换为 实体类,再进行调用。
简单来说 : 泛化调用即服务消费者端启用了泛化调用,而服务提供者端则是正常服务。 其简单Demo 如下
// 服务接口类
public interface SimpleDemoService {
String sayHello(String msg);
List<String> sayHello2(String msg);
}
// 常规的服务提供者
public class GenericProvider {
public static void main(String[] args) throws IOException {
// applicationname 为 simple, 接口类型为 SimpleDemoService, 实现类为 MainSimpleDemoServiceImpl
ServiceConfig<SimpleDemoService> serviceConfig = DubboUtil.serviceConfig("simple", SimpleDemoService.class, new MainSimpleDemoServiceImpl());
serviceConfig.export();
System.out.println("service is start");
System.in.read();
}
}
// 输出
// sayHello = MainSimpleDemoServiceImpl : generic
// sayHello2 = [MainSimpleDemoServiceImpl : generic2]
public class GenericConsumer {
public static void main(String[] args) {
ReferenceConfig<GenericService> referenceConfig = DubboUtil.referenceConfig("simple", GenericService.class);
referenceConfig.setGeneric(true);
referenceConfig.setInterface("com.dubbo.service.impl.SimpleDemoService");
GenericService genericService = referenceConfig.get();
// 通过 GenericService.$invoke 方法指定是泛化调用,方法名为 sayHello。参数类型为 String, 参数内容为 generic
Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{"generic"});
System.out.println("sayHello = " + sayHello);
// 调用 sayHello2 方法
Object sayHello2 = genericService.$invoke("sayHello2", new String[]{"java.lang.String"}, new Object[]{"generic2"});
System.out.println("sayHello2 = " + sayHello2);
}
}
泛化实现
泛化实现:泛化接口实现主要用于服务提供端没有API接口类及模型类元(比如入参和出参的POJO 类)的情况下使用。消费者发起接口请求时需要将相关信息转换为 Map 传递给 提供者,由提供者根据信息找到对应的泛型实现来进行处理。
简单来说 : 泛化实现即服务提供者端启用了泛化实现,而服务消费者端则是正常调用。
// 服务提供者
public class GenericProvider {
public static void main(String[] args) throws IOException {
// 获取 ServiceConfig
// 指定 applicationname 为 simple
// 指定 interface 为 com.kingfish.service.impl.SimpleDemoService,
// 指定 Ref 为 匿名 GenericService 接口实现类
ServiceConfig<GenericService> serviceConfig = DubboUtil.serviceConfig("simple", "com.kingfish.service.impl.SimpleDemoService", new GenericService() {
@Override
public Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException {
// 当消费者调用SimpleDemoService 服务时会进入到该方法中,根据如下判断逻辑来处理不同的方法。
if ("sayHello".equals(method)) {
return "generic sayHello " + Arrays.toString(args);
} else if ("sayHello2".equals(method)) {
return Lists.newArrayList("generic sayHello2 " + Arrays.toString(args));
}
return null;
}
});
// 启用泛化
serviceConfig.setGeneric("true");
// 服务暴露
serviceConfig.export();
System.out.println("service is start");
// 阻塞程序,防止main方法执行结束
System.in.read();
}
}
// 输出
// s = generic sayHello [111]
// strings = [generic sayHello2 [222]]
public class GenericConsumer {
public static void main(String[] args) {
// 获取 SimpleDemoService 的 referenceConfig
ReferenceConfig<SimpleDemoService> referenceConfig = DubboUtil.referenceConfig("simple", SimpleDemoService.class);
SimpleDemoService simpleDemoService = referenceConfig.get();
// 调用 SimpleDemoService 的方法
String s = simpleDemoService.sayHello("111");
System.out.println("s = " + s);
List<String> strings = simpleDemoService.sayHello2("222");
System.out.println("strings = " + strings);
}
}
源码实现
Dubbo 泛化调用和泛化实现依赖于下面两个过滤器 来完成。
- GenericImplFilter:完成了消费者端的泛化功能。当请求发起时如果是泛化调用,则会将
- GenericFilter:完成了提供者端的泛化功能
GenericImplFilter
GenericImplFilter 作用于消费者端。
- 当消费者进行调用的是泛化实现时,会将参数信息按照指定的序列化方式进行序列化后进行泛化调用。(这里会将调用方法指定为 $invoke,因为 GenericFilter 中判断是否是泛化调用的条件之一就是 方法名为 $invoke)
- 当消费者进行泛化调用时,会将参数信息进行序列化后进行泛化调用。 ```java /**
-
GenericImplInvokerFilter */ @Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000) public class GenericImplFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class); // 泛化调用的参数类型 private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY); // 1. 判断服务端是否是泛化实现 // generic 满足三种泛化情况之一 && 调用方法名不为 $invoke && 参数类型为 RpcInvocation if (ProtocolUtils.isGeneric(generic) && !Constants.$INVOKE.equals(invocation.getMethodName()) && invocation instanceof RpcInvocation) { // 1.1 获取泛化调用的参数 :调用方法名、调用参数类型、调用参数值等 RpcInvocation invocation2 = (RpcInvocation) invocation; String methodName = invocation2.getMethodName(); Class<?>[] parameterTypes = invocation2.getParameterTypes(); Object[] arguments = invocation2.getArguments();
String[] types = new String[parameterTypes.length]; for (int i = 0; i < parameterTypes.length; i++) { types[i] = ReflectUtils.getName(parameterTypes[i]); } Object[] args; // 1.2 判断序列化方式,进行序列化 // 如果是 Bean 序列化方式,则使用JavaBeanSerializeUtil 进行序列化 if (ProtocolUtils.isBeanGenericSerialization(generic)) { args = new Object[arguments.length]; for (int i = 0; i < arguments.length; i++) { args[i] = JavaBeanSerializeUtil.serialize(arguments[i], JavaBeanAccessor.METHOD); } } else { // 否则(generic = true || nativejava) 使用PojoUtils 进行序列化 args = PojoUtils.generalize(arguments); } // 设置调用方法为 $invoke、参数类型为GENERIC_PARAMETER_TYPES,并设置参数具体值。 // 目的是为了让 GenericFilter 能识别出这次调用是泛化调用。 invocation2.setMethodName(Constants.$INVOKE); invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES); invocation2.setArguments(new Object[]{methodName, types, args}); // 1.3 进行泛化调用 Result result = invoker.invoke(invocation2); // 1.4 如果泛化调用没有异常, 则将结果集反序列化后返回。 if (!result.hasException()) { Object value = result.getValue(); try { Method method = invoker.getInterface().getMethod(methodName, parameterTypes); // 对结果进行反序列化 if (ProtocolUtils.isBeanGenericSerialization(generic)) { if (value == null) { return new RpcResult(value); } else if (value instanceof JavaBeanDescriptor) { return new RpcResult(JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) value)); } else { throw new RpcException( "The type of result value is " + value.getClass().getName() + " other than " + JavaBeanDescriptor.class.getName() + ", and the result is " + value); } } else { return new RpcResult(PojoUtils.realize(value, method.getReturnType(), method.getGenericReturnType())); } } catch (NoSuchMethodException e) { throw new RpcException(e.getMessage(), e); } } else if (result.getException() instanceof GenericException) { // 返回异常是 GenericException 类型,则说明是泛化异常而非调用过程中异常。进行处理 GenericException exception = (GenericException) result.getException(); try { String className = exception.getExceptionClass(); Class<?> clazz = ReflectUtils.forName(className); Throwable targetException = null; Throwable lastException = null; try { targetException = (Throwable) clazz.newInstance(); } catch (Throwable e) { lastException = e; for (Constructor<?> constructor : clazz.getConstructors()) { try { targetException = (Throwable) constructor.newInstance(new Object[constructor.getParameterTypes().length]); break; } catch (Throwable e1) { lastException = e1; } } } if (targetException != null) { try { Field field = Throwable.class.getDeclaredField("detailMessage"); if (!field.isAccessible()) { field.setAccessible(true); } field.set(targetException, exception.getExceptionMessage()); } catch (Throwable e) { logger.warn(e.getMessage(), e); } result = new RpcResult(targetException); } else if (lastException != null) { throw lastException; } } catch (Throwable e) { throw new RpcException("Can not deserialize exception " + exception.getExceptionClass() + ", message: " + exception.getExceptionMessage(), e); } } return result; } // 2. 判断消费者是否开启了泛化调用 // 调用方法名为 $invoke && invocation参数有三个 && generic 参数满足三种泛化方式之一 if (invocation.getMethodName().equals(Constants.$INVOKE) && invocation.getArguments() != null && invocation.getArguments().length == 3 && ProtocolUtils.isGeneric(generic)) { // 2.1 序列化参数 Object[] args = (Object[]) invocation.getArguments()[2]; if (ProtocolUtils.isJavaGenericSerialization(generic)) { for (Object arg : args) { if (!(byte[].class == arg.getClass())) { // 抛出 RpcException 异常 error(generic, byte[].class.getName(), arg.getClass().getName()); } } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { for (Object arg : args) { if (!(arg instanceof JavaBeanDescriptor)) { // 抛出 RpcException 异常 error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName()); } } } // 设置参数 ((RpcInvocation) invocation).setAttachment( Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY)); } // 进行序列化调用并返回 return invoker.invoke(invocation); }
}
## GenericFilter
GenericFilter 作用于消费者。在 GenericImplFilter 中我们知道,一旦Dubbo确定了是泛化调用或提供者时泛化实现时就会将参数序列化,所以 GenericFilter 判断如果是泛化操作第一步则是按照序列化方式进行反序列化,并进行服务调用。
```java
@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// 1. 消费者进行泛化调用
// 调用方法为 $invoke && 参数有三个 && 调用接口不是 GenericService
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
// 1.1 参数解析
// 调用方法名
String name = ((String) inv.getArguments()[0]).trim();
// 调用方法参数类型
String[] types = (String[]) inv.getArguments()[1];
// 调用参数值
Object[] args = (Object[]) inv.getArguments()[2];
try {
// 通过反射获取方法实例
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
// 获取泛化调用的泛化类型
String generic = inv.getAttachment(Constants.GENERIC_KEY);
// 泛化类型为空,则从上下文获取
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
// generic 为空 || 默认情况,则使用 generic=true 的方式
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
// generic = nativjava 的方式
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try {
UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]);
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
"] only support message type " +
byte[].class +
" and your message type is " +
args[i].getClass());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
// generic = bean的方式
for (int i = 0; i < args.length; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_BEAN +
"] only support message type " +
JavaBeanDescriptor.class.getName() +
" and your message type is " +
args[i].getClass().getName());
}
}
}
// 进行服务调用。这里会先传递给下一个 filter,最后进行服务调用
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
// 对结果集进行反序列化并返回
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
try {
UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.serialize(null, os).writeObject(result.getValue());
return new RpcResult(os.toByteArray());
} catch (IOException e) {
throw new RpcException("Serialize result failed.", e);
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD));
} else {
return new RpcResult(PojoUtils.generalize(result.getValue()));
}
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
// 2. 常规调用
return invoker.invoke(inv);
}
}
Dubbo的泛化调用和泛化实现逻辑还是比较简单:
-
消费者进行服务调用: 如果消费者调用的提供者是泛化实现并且消费者不是泛化调用,则 GenericImplFilter 需要将调用修改为泛化调用形式,以便让GenericFilter 识别出是泛化调用。 如果消费者进行泛化调用,则 GenericImplFilter 对参数进行序列化后进行服务调用
-
服务端接收到服务调用: GenericFilter 会首先判断是否是泛化调用。判断条件如下: 1)调用方法名为 $invoke 2)调用参数有三个 3)调用的接口不能为 GenericService
如果满足上述条件,则 GenericFilter 对参数进行反序列化,并根据方法名、接口名等参数进行调用。否则按照常规逻辑调用。
这里我们需要清楚,对于GenericImplFilter 来说,存在泛化调用和泛化实现两种逻辑,泛化调用则需要将参数序列化, 调用泛化实现则需要将调用方法、参数等做相应的处理。而对于 GenericFilter 只需要关注是否是泛化调用。如果是泛化调用,则根据参数确定调用的接口、方法以及参数信息进行调用。
注:
- 如果提供者暴露服务接口为 GenericService, 则默认其开启了泛化调用,generic 默认为 true。
- 当提供者服务为泛化实现时,消费者引用的url会和提供者的url 在RegistryDirectory#toInvokers 方法中进行属性合并,消费者在合并时将 genric 属性进行了合并,激活了消费者端的 GenericImplFilter 过滤器。
隐式参数传递
Dubbo提供了隐式参数传递的功能,即服务调用方可以通过RpcContext.getContext().setAttachment() 方法设置附加属性键值对,然后设置的键值对可以在服务提供方服务方法内获取。
@Service
public class MainSimpleDemoServiceImpl implements SimpleDemoService {
@Override
public String sayHello(String msg) {
// 这里会将上下文的参数获取到并返回
Object context = RpcContext.getContext().getAttachment("context");
return "MainSimpleDemoServiceImpl : " + msg + " context = " + context;
}
}
// 消费者。设置参数 context = SimpleConsumer
public class SimpleConsumer {
public static void main(String[] args) throws InterruptedException {
ReferenceConfig<SimpleDemoService> referenceConfig = DubboUtil.referenceConfig("dubbo-consumer", SimpleDemoService.class);
referenceConfig.setMonitor("http://localhost:8080");
SimpleDemoService demoService = referenceConfig.get();
// 消费者设置隐式参数 context = SimpleConsumer,提供者可以获取到该参数
RpcContext.getContext().setAttachment("context", "SimpleConsumer");
// 输出 MainSimpleDemoServiceImpl : SimpleConsumer context = SimpleConsumer
System.out.println(demoService.sayHello("SimpleConsumer"));
}
}
// 服务提供者
public class SimpleProvider {
public static void main(String[] args) throws IOException {
ServiceConfig<SimpleDemoService> serviceConfig = DubboUtil.serviceConfig("dubbo-provider", SimpleDemoService.class, new MainSimpleDemoServiceImpl());
serviceConfig.export();
System.out.println("service is start");
System.in.read();
}
}
这里可以看到,消费者在调用sayHello 前,在上下文中通过 RpcContext.getContext().setAttachment(“context”, “SimpleConsumer”); 保存了隐式参数 context = SimpleConsumer。在提供者端可以通过 RpcContext.getContext().getAttachment(“context”); 获取到隐式参数的值。
不过需要注意的是,上下文参数是一次性的,即设置一次参数只能获取一次。
源码分析
Dubbo 隐式参数实现依赖于 ConsumerContextFilter & ContextFilter 完成,下面我们来看具体逻辑:
AbstractClusterInvoker
消费者发起调用时,会在 AbstractClusterInvoker#invoke 方法中将 Rpc上下文中的附件(Attachments) 作为调用附件传递给提供者端。
AbstractClusterInvoker#invoke的实现如下:
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
// 获取上下文参数
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
// 如果上下文参数不为空,则添加到 RpcInvocation 中。RpcInvocation会随着网络调用传递到提供者端
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 获取服务提供者列表
List<Invoker<T>> invokers = list(invocation);
// 初始化负载均衡策略
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 进行调用
return doInvoke(invocation, invokers, loadbalance);
}
ConsumerContextFilter
ConsumerContextFilter 作用域消费者端。作用是在服务调用结束后清除上下文中的附件信息。
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 设置上下文的信息
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
// TODO should we clear server context?
// 清除 ServerContext
RpcContext.removeServerContext();
// 进行服务调用
return invoker.invoke(invocation);
} finally {
// TODO removeContext? but we need to save future for RpcContext.getFuture() API. If clear attachments here, attachments will not available when postProcessResult is invoked.
// 清除当前线程中添加的附加属性
RpcContext.getContext().clearAttachments();
}
}
@Override
public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
// 调用返回时设置 ServerContext
RpcContext.getServerContext().setAttachments(result.getAttachments());
return result;
}
}
ContextFilter
ContextFilter 作用于提供者端,作用是将Rpc 调用参数中的附件保存到当前 Rpc 上下文中。
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 获取 Invocation 中的属性添加到上下文中
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
// 移除一些系统参数
attachments = new HashMap<String, String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
// Remove async property to avoid being passed to the following invoke chain.
attachments.remove(Constants.ASYNC_KEY);
attachments.remove(Constants.TAG_KEY);
attachments.remove(Constants.FORCE_USE_TAG);
}
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
// merged from dubbox
// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
// 如果附加属性不为空则设置到上下文对象中。
if (attachments != null) {
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
// IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
// 调用结束移除 上下文信息
RpcContext.removeContext();
RpcContext.removeServerContext();
}
}
@Override
public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
// 处理结束后,向 ServerContext 中添加附件属性,Result 会随着网络请求返回给 消费者
result.addAttachments(RpcContext.getServerContext().getAttachments());
return result;
}
}
Dubbo的隐式参数实现比较简单:
- 服务调用时在上下文中设置隐式参数
- AbstractClusterInvoker#invoke 将上下文中的隐式参数保存到调用参数中
- ContextFilter 将调用参数中的隐式参数取出,设置到当前Rpc上下文中,供服务方法使用。