如何使用 Reliable Services 通信 APIHow to use the Reliable Services communication APIs

“Azure Service Fabric 即平台”完全不受服务间通信的影响。Azure Service Fabric as a platform is completely agnostic about communication between services. 所有协议和堆栈(从 UDP 到 HTTP)都可接受。All protocols and stacks are acceptable, from UDP to HTTP. 至于服务应以哪种方式通信,完全由服务开发人员选择。It's up to the service developer to choose how services should communicate. Reliable Services 应用程序框架提供了内置通信堆栈和 API,可用于生成自定义通信组件。The Reliable Services application framework provides built-in communication stacks as well as APIs that you can use to build your custom communication components.

设置服务通信Set up service communication

Reliable Services API 为服务通信使用一个简单的接口。The Reliable Services API uses a simple interface for service communication. 若要打开服务的终结点,只需实现此接口即可:To open an endpoint for your service, simply implement this interface:


public interface ICommunicationListener
{
    Task<string> OpenAsync(CancellationToken cancellationToken);

    Task CloseAsync(CancellationToken cancellationToken);

    void Abort();
}

public interface CommunicationListener {
    CompletableFuture<String> openAsync(CancellationToken cancellationToken);

    CompletableFuture<?> closeAsync(CancellationToken cancellationToken);

    void abort();
}

然后,可以在基于服务的类方法重写中返回通信侦听器实现代码,从而添加此实现代码。You can then add your communication listener implementation by returning it in a service-based class method override.

对于无状态服务:For stateless services:

public class MyStatelessService : StatelessService
{
    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        ...
    }
    ...
}
public class MyStatelessService extends StatelessService {

    @Override
    protected List<ServiceInstanceListener> createServiceInstanceListeners() {
        ...
    }
    ...
}

对于有状态服务:For stateful services:

    @Override
    protected List<ServiceReplicaListener> createServiceReplicaListeners() {
        ...
    }
    ...
public class MyStatefulService : StatefulService
{
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
        ...
    }
    ...
}

在这两种情况下,都将返回侦听器的集合。In both cases, you return a collection of listeners. 这可让服务通过多个侦听器,可能使用不同的协议在多个终结点上侦听。This allows your service to listen on multiple endpoints, potentially using different protocols, by using multiple listeners. 例如,可能有 HTTP 侦听器和单独的 WebSocket 侦听器。For example, you may have an HTTP listener and a separate WebSocket listener. 当客户端请求服务实例或分区的侦听地址时,每个侦听器将获取一个名称,生成的“名称 : 地址” 对集合以 JSON 对象的形式表示。Each listener gets a name, and the resulting collection of name : address pairs are represented as a JSON object when a client requests the listening addresses for a service instance or a partition.

在无状态服务中,重写将返回 ServiceInstanceListeners 的集合。In a stateless service, the override returns a collection of ServiceInstanceListeners. ServiceInstanceListener 包含一个函数,用于创建 ICommunicationListener(C#) / CommunicationListener(Java) 并为其提供名称。A ServiceInstanceListener contains a function to create an ICommunicationListener(C#) / CommunicationListener(Java) and gives it a name. 对于有状态服务,重写将返回 ServiceReplicaListeners 集合。For stateful services, the override returns a collection of ServiceReplicaListeners. 这与无状态服务稍有不同,因为 ServiceReplicaListener 可以选择在辅助副本上打开 ICommunicationListenerThis is slightly different from its stateless counterpart, because a ServiceReplicaListener has an option to open an ICommunicationListener on secondary replicas. 不仅可以在服务中使用多个通信侦听器,而且还可以指定哪些侦听器要在辅助副本上接受请求,以及哪些侦听器只能在主副本上进行侦听。Not only can you use multiple communication listeners in a service, but you can also specify which listeners accept requests on secondary replicas and which ones listen only on primary replicas.

例如,可以创建一个只在主副本上接受 RPC 调用的 ServiceRemotingListener,并创建另一个可通过 HTTP 在辅助副本上接受读取请求的自定义侦听器:For example, you can have a ServiceRemotingListener that takes RPC calls only on primary replicas, and a second, custom listener that takes read requests on secondary replicas over HTTP:

protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
    return new[]
    {
        new ServiceReplicaListener(context =>
            new MyCustomHttpListener(context),
            "HTTPReadonlyEndpoint",
            true),

        new ServiceReplicaListener(context =>
            this.CreateServiceRemotingListener(context),
            "rpcPrimaryEndpoint",
            false)
    };
}

备注

为一个服务创建多个侦听器时,必须为每个侦听器指定一个唯一名称。When creating multiple listeners for a service, each listener must be given a unique name.

最后,在服务清单中有关终结点的节下面描述服务所需的终结点。Finally, describe the endpoints that are required for the service in the service manifest under the section on endpoints.

<Resources>
    <Endpoints>
      <Endpoint Name="WebServiceEndpoint" Protocol="http" Port="80" />
      <Endpoint Name="OtherServiceEndpoint" Protocol="tcp" Port="8505" />
    <Endpoints>
</Resources>

通信侦听器可以从 ServiceContext 中的 CodePackageActivationContext 访问分配给它的终结点资源。The communication listener can access the endpoint resources allocated to it from the CodePackageActivationContext in the ServiceContext. 然后,侦听器在打开时开始侦听请求。The listener can then start listening for requests when it is opened.

var codePackageActivationContext = serviceContext.CodePackageActivationContext;
var port = codePackageActivationContext.GetEndpoint("ServiceEndpoint").Port;

CodePackageActivationContext codePackageActivationContext = serviceContext.getCodePackageActivationContext();
int port = codePackageActivationContext.getEndpoint("ServiceEndpoint").getPort();

备注

终结点资源对于整个服务包是通用的,由 Service Fabric 在激活服务包时分配。Endpoint resources are common to the entire service package, and they are allocated by Service Fabric when the service package is activated. 托管在同一 ServiceHost 中的多个服务副本可能共享同一个端口。Multiple service replicas hosted in the same ServiceHost may share the same port. 这意味着通信侦听器应支持端口共享。This means that the communication listener should support port sharing. 实现此目标的一种推荐方法是通信侦听器在生成侦听地址时使用分区 ID 和副本/实例 ID。The recommended way of doing this is for the communication listener to use the partition ID and replica/instance ID when it generates the listen address.

服务地址注册Service address registration

名为命名服务的系统服务在 Service Fabric 群集上运行。A system service called the Naming Service runs on Service Fabric clusters. 命名服务是服务及其地址(服务的每个实例或副本正在其上侦听)的注册机构。The Naming Service is a registrar for services and their addresses that each instance or replica of the service is listening on. ICommunicationListener(C#) / CommunicationListener(Java)OpenAsync(C#) / openAsync(Java) 方法完成时,它的返回值会在命名服务中注册。When the OpenAsync(C#) / openAsync(Java) method of an ICommunicationListener(C#) / CommunicationListener(Java) completes, its return value gets registered in the Naming Service. 这个在命名服务中发布的返回值是一个字符串,其值完全可以是任何内容。This return value that gets published in the Naming Service is a string whose value can be anything at all. 此字符串值是客户端向命名服务请求服务的地址时看到的内容。This string value is what clients see when they ask for an address for the service from the Naming Service.

public Task<string> OpenAsync(CancellationToken cancellationToken)
{
    EndpointResourceDescription serviceEndpoint = serviceContext.CodePackageActivationContext.GetEndpoint("ServiceEndpoint");
    int port = serviceEndpoint.Port;

    this.listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "http://+:{0}/",
                port);

    this.publishAddress = this.listeningAddress.Replace("+", FabricRuntime.GetNodeContext().IPAddressOrFQDN);

    this.webApp = WebApp.Start(this.listeningAddress, appBuilder => this.startup.Invoke(appBuilder));

    // the string returned here will be published in the Naming Service.
    return Task.FromResult(this.publishAddress);
}
public CompletableFuture<String> openAsync(CancellationToken cancellationToken)
{
    EndpointResourceDescription serviceEndpoint = serviceContext.getCodePackageActivationContext.getEndpoint("ServiceEndpoint");
    int port = serviceEndpoint.getPort();

    this.publishAddress = String.format("http://%s:%d/", FabricRuntime.getNodeContext().getIpAddressOrFQDN(), port);

    this.webApp = new WebApp(port);
    this.webApp.start();

    /* the string returned here will be published in the Naming Service.
     */
    return CompletableFuture.completedFuture(this.publishAddress);
}

Service Fabric 提供许多 API,使客户端和其他服务随后可以通过服务名称请求此地址。Service Fabric provides APIs that allow clients and other services to then ask for this address by service name. 这一点很重要,因为服务地址不是静态的。This is important because the service address is not static. 服务为了资源平衡和可用性目的在群集中移动。Services are moved around in the cluster for resource balancing and availability purposes. 这是允许客户端为服务解析侦听地址的机制。This is the mechanism that allow clients to resolve the listening address for a service.

备注

有关如何用 C# 编写通信侦听器的完整演练,请参阅 Service Fabric Web API 服务与 OWIN 自托管,而在 Java 中,可以编写自己的 HTTP 服务器实现,请参阅 https://github.com/Azure-Samples/service-fabric-java-getting-started 中的 EchoServer 应用程序示例。For a complete walk-through of how to write a communication listener, see Service Fabric Web API services with OWIN self-hosting for C#, whereas for Java you can write your own HTTP server implementation, see EchoServer application example at https://github.com/Azure-Samples/service-fabric-java-getting-started.

与服务通信Communicating with a service

Reliable Services API 提供以下库来编写与服务通信的客户端。The Reliable Services API provides the following libraries to write clients that communicate with services.

服务终结点解析Service endpoint resolution

与服务通信的第一步是,解析要与之通信的服务分区或实例的终结点地址。The first step to communication with a service is to resolve an endpoint address of the partition or instance of the service you want to talk to. ServicePartitionResolver(C#) / FabricServicePartitionResolver(Java) 实用工具类是一个基本基元,可帮助客户端在运行时确定服务的终结点。The ServicePartitionResolver(C#) / FabricServicePartitionResolver(Java) utility class is a basic primitive that helps clients determine the endpoint of a service at runtime. 确定服务终结点的过程在 Service Fabric 术语中称为服务终结点解析In Service Fabric terminology, the process of determining the endpoint of a service is referred to as the service endpoint resolution.

若要连接到群集内的服务,可使用默认设置创建 ServicePartitionResolver。To connect to services within a cluster, ServicePartitionResolver can be created using default settings. 这是针对大多数情况的建议用法:This is the recommended usage for most situations:

ServicePartitionResolver resolver = ServicePartitionResolver.GetDefault();
FabricServicePartitionResolver resolver = FabricServicePartitionResolver.getDefault();

若要连接到不同群集中的服务,可利用一组群集网关终结点来创建 ServicePartitionResolver。To connect to services in a different cluster, a ServicePartitionResolver can be created with a set of cluster gateway endpoints. 请注意,网关终结点只是可用来连接到相同群集的不同终结点。Note that gateway endpoints are just different endpoints for connecting to the same cluster. 例如:For example:

ServicePartitionResolver resolver = new  ServicePartitionResolver("mycluster.cloudapp.chinacloudapi.cn:19000", "mycluster.cloudapp.chinacloudapi.cn:19001");
FabricServicePartitionResolver resolver = new  FabricServicePartitionResolver("mycluster.cloudapp.chinacloudapi.cn:19000", "mycluster.cloudapp.chinacloudapi.cn:19001");

或者,可为 ServicePartitionResolver 指定一个函数来创建 FabricClient,以便在内部使用:Alternatively, ServicePartitionResolver can be given a function for creating a FabricClient to use internally:

public delegate FabricClient CreateFabricClientDelegate();
public FabricServicePartitionResolver(CreateFabricClient createFabricClient) {
...
}

public interface CreateFabricClient {
    public FabricClient getFabricClient();
}

FabricClient 是用于与 Service Fabric 群集通信以便在群集上实现各种管理操作的对象。FabricClient is the object that is used to communicate with the Service Fabric cluster for various management operations on the cluster. 当需要更好地控制服务分区解析程序与群集交互的方式时,这非常有用。This is useful when you want more control over how a service partition resolver interacts with your cluster. FabricClient 会在内部执行缓存,但创建成本通常很高,因此,一定要尽可能重复使用 FabricClient 实例。FabricClient performs caching internally and is generally expensive to create, so it is important to reuse FabricClient instances as much as possible.

ServicePartitionResolver resolver = new  ServicePartitionResolver(() => CreateMyFabricClient());
FabricServicePartitionResolver resolver = new  FabricServicePartitionResolver(() -> new CreateFabricClientImpl());

接下来,使用解析方法来检索服务的地址或已分区服务的服务分区的地址。A resolve method is then used to retrieve the address of a service or a service partition for partitioned services.

ServicePartitionResolver resolver = ServicePartitionResolver.GetDefault();

ResolvedServicePartition partition =
    await resolver.ResolveAsync(new Uri("fabric:/MyApp/MyService"), new ServicePartitionKey(), cancellationToken);
FabricServicePartitionResolver resolver = FabricServicePartitionResolver.getDefault();

CompletableFuture<ResolvedServicePartition> partition =
    resolver.resolveAsync(new URI("fabric:/MyApp/MyService"), new ServicePartitionKey());

服务地址可以使用 ServicePartitionResolver 轻松解析,但需要执行更多操作,才能确保解析的地址可正确使用。A service address can be resolved easily using a ServicePartitionResolver, but more work is required to ensure the resolved address can be used correctly. 客户端必须检测连接尝试是因暂时性错误而失败且可重试(例如,服务已移动或暂时不可用),还是因永久错误而失败(例如,已删除服务或请求的资源不再存在)。Your client needs to detect whether the connection attempt failed because of a transient error and can be retried (e.g., service moved or is temporarily unavailable), or a permanent error (e.g., service was deleted or the requested resource no longer exists). 服务实例或副本可出于多种原因随时在节点之间移动。Service instances or replicas can move around from node to node at any time for multiple reasons. 通过 ServicePartitionResolver 解析的服务地址可能会在客户端代码尝试连接之前过时。The service address resolved through ServicePartitionResolver may be stale by the time your client code attempts to connect. 再回到这种情况,客户端必须重新解析地址。In that case again the client needs to re-resolve the address. 如果提供先前的 ResolvedServicePartition,则表示解析程序需要再试一次,而不只是检索缓存的地址。Providing the previous ResolvedServicePartition indicates that the resolver needs to try again rather than simply retrieve a cached address.

客户端代码通常不需要直接处理 ServicePartitionResolver。Typically, the client code need not work with the ServicePartitionResolver directly. 它在创建后即会传递给 Reliable Services API 中的通信客户端工厂。It is created and passed on to communication client factories in the Reliable Services API. 这些工厂会在内部使用解析程序来生成可用来与服务通信的客户端对象。The factories use the resolver internally to generate a client object that can be used to communicate with services.

通信客户端和工厂Communication clients and factories

通信工厂库可实现典型的错误处理重试模式,从而可以更轻松地重试连接已解析服务终结点。The communication factory library implements a typical fault-handling retry pattern that makes retrying connections to resolved service endpoints easier. 尽管提供错误处理程序,工厂库还是会提供重试机制。The factory library provides the retry mechanism while you provide the error handlers.

ICommunicationClientFactory(C#) / CommunicationClientFactory(Java) 定义由通信客户端工厂实现的基接口,该通信客户端工厂生成可与 Service Fabric 服务通信的客户端。ICommunicationClientFactory(C#) / CommunicationClientFactory(Java) defines the base interface implemented by a communication client factory that produces clients that can talk to a Service Fabric service. CommunicationClientFactory 的实现将取决于客户端要与之进行通信的 Service Fabric 服务所使用的通信堆栈。The implementation of the CommunicationClientFactory depends on the communication stack used by the Service Fabric service where the client wants to communicate. Reliable Services API 提供 CommunicationClientFactoryBase<TCommunicationClient>The Reliable Services API provides a CommunicationClientFactoryBase<TCommunicationClient>. 这提供了 CommunicationClientFactory 接口的基实现,并可执行所有通信堆栈共有的任务。This provides a base implementation of the CommunicationClientFactory interface and performs tasks that are common to all the communication stacks. (这些任务包括使用 ServicePartitionResolver 来确定服务终结点)。(These tasks include using a ServicePartitionResolver to determine the service endpoint). 客户端通常实现抽象 CommunicationClientFactoryBase 类来处理通信堆栈特定的逻辑。Clients usually implement the abstract CommunicationClientFactoryBase class to handle logic that is specific to the communication stack.

通信客户端只接收地址,并使用它来连接到服务。The communication client just receives an address and uses it to connect to a service. 客户端可以使用它想要的任何协议。The client can use whatever protocol it wants.

public class MyCommunicationClient : ICommunicationClient
{
    public ResolvedServiceEndpoint Endpoint { get; set; }

    public string ListenerName { get; set; }

    public ResolvedServicePartition ResolvedServicePartition { get; set; }
}
public class MyCommunicationClient implements CommunicationClient {

    private ResolvedServicePartition resolvedServicePartition;
    private String listenerName;
    private ResolvedServiceEndpoint endPoint;

    /*
     * Getters and Setters
     */
}

客户端工厂主要负责创建通信客户端。The client factory is primarily responsible for creating communication clients. 对于不会维持持续连接的客户端(例如 HTTP 客户端),工厂只需创建并返回客户端。For clients that don't maintain a persistent connection, such as an HTTP client, the factory only needs to create and return the client. 其他会维持持续连接的协议(例如某些二进制协议)也应该由工厂验证,以确定是否需要重新创建连接。Other protocols that maintain a persistent connection, such as some binary protocols, should also be validated by the factory to determine whether the connection needs to be re-created.

public class MyCommunicationClientFactory : CommunicationClientFactoryBase<MyCommunicationClient>
{
    protected override void AbortClient(MyCommunicationClient client)
    {
    }

    protected override Task<MyCommunicationClient> CreateClientAsync(string endpoint, CancellationToken cancellationToken)
    {
    }

    protected override bool ValidateClient(MyCommunicationClient clientChannel)
    {
    }

    protected override bool ValidateClient(string endpoint, MyCommunicationClient client)
    {
    }
}
public class MyCommunicationClientFactory extends CommunicationClientFactoryBase<MyCommunicationClient> {

    @Override
    protected boolean validateClient(MyCommunicationClient clientChannel) {
    }

    @Override
    protected boolean validateClient(String endpoint, MyCommunicationClient client) {
    }

    @Override
    protected CompletableFuture<MyCommunicationClient> createClientAsync(String endpoint) {
    }

    @Override
    protected void abortClient(MyCommunicationClient client) {
    }
}

最后,异常处理程序负责确定发生异常时要采取的操作。Finally, an exception handler is responsible for determining what action to take when an exception occurs. 异常分为可重试不可重试两种类型。Exceptions are categorized into retryable and non retryable.

  • 不可重试的异常只会重新引发回调用方。Non retryable exceptions simply get rethrown back to the caller.
  • 可重试的异常进一步分为暂时性非暂时性两种类型。retryable exceptions are further categorized into transient and non-transient.
    • 暂时性异常是只会重试而不会重新解析服务终结点地址的异常。Transient exceptions are those that can simply be retried without re-resolving the service endpoint address. 这类异常包括暂时性网络问题或服务错误响应,但不包括指出服务终结点地址不存在的异常。These will include transient network problems or service error responses other than those that indicate the service endpoint address does not exist.
    • 非暂时性异常是需要重新解析服务终结点地址的异常。Non-transient exceptions are those that require the service endpoint address to be re-resolved. 这类异常包括指出无法访问服务终结点(表示服务已移至其他节点)的异常。These include exceptions that indicate the service endpoint could not be reached, indicating the service has moved to a different node.

TryHandleException 针对给定异常做出决定。The TryHandleException makes a decision about a given exception. 如果它不知道要对异常做出哪些决定,则应返回 falseIf it does not know what decisions to make about an exception, it should return false. 如果它知道如何做决定,则应相应地设置结果并返回 trueIf it does know what decision to make, it should set the result accordingly and return true.

class MyExceptionHandler : IExceptionHandler
{
    public bool TryHandleException(ExceptionInformation exceptionInformation, OperationRetrySettings retrySettings, out ExceptionHandlingResult result)
    {
        // if exceptionInformation.Exception is known and is transient (can be retried without re-resolving)
        result = new ExceptionHandlingRetryResult(exceptionInformation.Exception, true, retrySettings, retrySettings.DefaultMaxRetryCount);
        return true;

        // if exceptionInformation.Exception is known and is not transient (indicates a new service endpoint address must be resolved)
        result = new ExceptionHandlingRetryResult(exceptionInformation.Exception, false, retrySettings, retrySettings.DefaultMaxRetryCount);
        return true;

        // if exceptionInformation.Exception is unknown (let the next IExceptionHandler attempt to handle it)
        result = null;
        return false;
    }
}
public class MyExceptionHandler implements ExceptionHandler {

    @Override
    public ExceptionHandlingResult handleException(ExceptionInformation exceptionInformation, OperationRetrySettings retrySettings) {

        /* if exceptionInformation.getException() is known and is transient (can be retried without re-resolving)
         */
        result = new ExceptionHandlingRetryResult(exceptionInformation.getException(), true, retrySettings, retrySettings.getDefaultMaxRetryCount());
        return true;

        /* if exceptionInformation.getException() is known and is not transient (indicates a new service endpoint address must be resolved)
         */
        result = new ExceptionHandlingRetryResult(exceptionInformation.getException(), false, retrySettings, retrySettings.getDefaultMaxRetryCount());
        return true;

        /* if exceptionInformation.getException() is unknown (let the next ExceptionHandler attempt to handle it)
         */
        result = null;
        return false;

    }
}

汇总Putting it all together

使用以通信协议生成的 ICommunicationClient(C#) / CommunicationClient(Java)ICommunicationClientFactory(C#) / CommunicationClientFactory(Java)IExceptionHandler(C#) / ExceptionHandler(Java)ServicePartitionClient(C#) / FabricServicePartitionClient(Java) 会将它全部包装在一起,并为这些组件提供错误处理和服务分区地址解析循环。With an ICommunicationClient(C#) / CommunicationClient(Java), ICommunicationClientFactory(C#) / CommunicationClientFactory(Java), and IExceptionHandler(C#) / ExceptionHandler(Java) built around a communication protocol, a ServicePartitionClient(C#) / FabricServicePartitionClient(Java) wraps it all together and provides the fault-handling and service partition address resolution loop around these components.

private MyCommunicationClientFactory myCommunicationClientFactory;
private Uri myServiceUri;

var myServicePartitionClient = new ServicePartitionClient<MyCommunicationClient>(
    this.myCommunicationClientFactory,
    this.myServiceUri,
    myPartitionKey);

var result = await myServicePartitionClient.InvokeWithRetryAsync(async (client) =>
   {
      // Communicate with the service using the client.
   },
   CancellationToken.None);

private MyCommunicationClientFactory myCommunicationClientFactory;
private URI myServiceUri;

FabricServicePartitionClient myServicePartitionClient = new FabricServicePartitionClient<MyCommunicationClient>(
    this.myCommunicationClientFactory,
    this.myServiceUri,
    myPartitionKey);

CompletableFuture<?> result = myServicePartitionClient.invokeWithRetryAsync(client -> {
      /* Communicate with the service using the client.
       */
   });

后续步骤Next steps