打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
gRPC C++ 异步使用入门

这篇文章将讲述如何使用 gRPC 异步/非阻塞 C++ API 编写一个简单的服务端(server)和客户端(client)。在读取这篇文章之前,你需要先了解 Protocol Buffer 和 gRPC 基础,你可在本博客搜索到它们的相关文章。

这篇文章将围绕 gRPC 的官方例子 Greeter 展开学习,你可在 grpc/examples/protos/helloworld.proto 中查看服务的定义,在 grpc/examples/cpp/helloworld/ 中查看完整代码。

概览

gRPC 使用 CompletionQueue API 进行异步操作。基本工作流程如下:

  • 在 RPC 调用上绑定一个 CompletionQueue
  • 做一些事情,例如读或写这样的事情,用一个唯一的 void* 标记表示
  • 调用 CompletionQueue::Next 等待操作完成,如果返回之前的一个标记,则表示对应的操作已经完成。

异步客户端

要使用异步客户端调用远程方法,首先要创建 channel 和 stub,这个过程和 grpc/examples/cpp/helloworld/greeter_client.cc 中差不多(同步示例)。一旦你创建了 stub,就可以做以下事情来进行异步调用了:

  • 初始化 RPC 并为其创建句柄。将 RPC 绑定到一个 CompletionQueue 上。

    CompletionQueue cq;std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(    stub_->AsyncSayHello(&context, request, &cq)); 
  • 使用一个唯一的标记(这里用的是 (void*)1),请求回复和最终状态

    Status status;rpc->Finish(&reply, &status, (void*)1);
  • 等待完成队列返回标记(tag)。一旦返回了之前对应 Finish() 函数中传递的标记,应答和状态就可以被返回了。

    void* got_tag;bool ok = false;cq.Next(&got_tag, &ok);if (ok && got_tag == (void*)1) {    // check reply and status}

你可以在 grpc/examples/cpp/helloworld/greeter_async_client.cc 中看到完整的客户端示例。

异步服务端

服务器实现一个带有标记(tag)的 RPC 调用请求,然后等待完成队列返回标记。异步处理 RPC 的基本流程是:

  • 构建一个 server 并导出异步服务

    helloworld::Greeter::AsyncService service;ServerBuilder builder;builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());builder.RegisterAsyncService(&service);auto cq = builder.AddCompletionQueue();auto server = builder.BuildAndStart();
  • 请求一个 RPC,并提供唯一的标记(tag)

    ServerContext context;HelloRequest request;ServerAsyncResponseWriter<HelloReply> responder;service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
  • 等待完成队列返回标记。一旦检索到标记,context 、request 和 responder 就准备好了。

    HelloReply reply;Status status;void* got_tag;bool ok = false;cq.Next(&got_tag, &ok);if (ok && got_tag == (void*)1) {    // set reply and status    responder.Finish(reply, status, (void*)2);}
  • 等待完成队列返回标记。RPC 在标记返回时完成。

    void* got_tag;bool ok = false;cq.Next(&got_tag, &ok);if (ok && got_tag == (void*)2) {    // clean up}

但是,这个基本流程没有考虑到服务端同时处理多个请求。要解决这个问题,完整的异步服务端示例使用一个CallData 对象来维护每个 RPC 的状态,并使用该对象的地址作为调用的唯一标记。

class CallData {    public:    // Take in the "service" instance (in this case representing an asynchronous    // server) and the completion queue "cq" used for asynchronous communication    // with the gRPC runtime.    CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)         : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {        // Invoke the serving logic right away.        Proceed();    }    void Proceed() {        if (status_ == CREATE) {            // As part of the initial CREATE state, we *request* that the system            // start processing SayHello requests. In this request, "this" acts are            // the tag uniquely identifying the request (so that different CallData            // instances can serve different requests concurrently), in this case            // the memory address of this CallData instance.            service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,                                  this);            // Make this instance progress to the PROCESS state.            status_ = PROCESS;        } else if (status_ == PROCESS) {            // Spawn a new CallData instance to serve new clients while we process            // the one for this CallData. The instance will deallocate itself as            // part of its FINISH state.            new CallData(service_, cq_);                // The actual processing.            std::string prefix("Hello ");            reply_.set_message(prefix + request_.name());            // And we are done! Let the gRPC runtime know we've finished, using the            // memory address of this instance as the uniquely identifying tag for            // the event.            responder_.Finish(reply_, Status::OK, this);            status_ = FINISH;          } else {            GPR_ASSERT(status_ == FINISH);            // Once in the FINISH state, deallocate ourselves (CallData).            delete this;        }    }}

为简单起见,服务端对所有事件只使用一个完成队列,并在 HandleRpcs 中运行一个主循环来查询队列:

void HandleRpcs() {    // Spawn a new CallData instance to serve new clients.    new CallData(&service_, cq_.get());    void* tag;  // uniquely identifies a request.    bool ok;    while (true) {        // Block waiting to read the next event from the completion queue. The        // event is uniquely identified by its tag, which in this case is the        // memory address of a CallData instance.          cq_->Next(&tag, &ok);          GPR_ASSERT(ok);          static_cast<CallData*>(tag)->Proceed();    }}

关闭服务端

我们在使用一个完成队列来获取异步通知,在服务端被关闭之后,也必须小心地关闭它。

记住,我们在 ServerImpl::Run() 函数中通过运行 cq_ = builder.AddCompletionQueue() 来获得完成队列实例cq_ ,看看 ServerBuilder::AddCompletionQueue 的文档,我们可以看到:

… Caller is required to shutdown the server prior to shutting down the returned completion queue.

有关详细信息,请参考 ServerBuilder::AddCompletionQueue 的完整文档字符串。在我们的示例中,ServerImpl 的析构函数如下所示:

~ServerImpl() {    server_->Shutdown();    // Always shutdown the completion queue after the server.    cq_->Shutdown();}

你可以在 grpc/examples/cpp/helloworld/greeter_async_server.cc 中看到完整的服务端示例。

实现多个服务

前面官方提到的示例中只实现了一个 SayHello RPC 服务,如果想要实现多个 RPC服务该怎么办呢?下面的将讲述如何对示例中的代码进行修改,使他再支持一个名为 SayBye 的服务。

这个方法就是为每个 RPC 服务都实现一个不同的 CallData 类。但是,当你从 cq_->Next() 获取标记时,你知道它是指向这些类之一的对象的指针,但是你不知道它的确切类型。

为了克服这个问题,你可以让它们都继承一个具有 virtual Proceed() 成员函数的类,再根据需要在每个子类中实现它,当您获得一个标记时,将其转换为 CallData 并调用 Proceed()

class CallData { public:  virtual void Proceed() = 0;};class HelloCallData final : public CallData {...};class ByeCallData final : public CallData {...};...new HelloCallData(...);new ByeCallData(...);cq_->Next(&tag, &ok);static_cast<CallData*>(tag)->Proceed();...

多线程

对于如何在多线程中使用异步 RPC API 完成队列,官方的的文档说明是:

Right now, the best performance trade-off is having numcpu's threads and one completion queue per thread.

当前,最好的权衡性能的方法是使用创建 cpu 个数的线程数,并在每个线程中都使用一个完成队列。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
Android实战技术:深入理解Android的RPC方式与AIDL
程序员,你也该懂系统集成之服务集成交互技术——网络协议了吧?
golang gRPC 入门
gin+grpc+etcd 重构 grpc-todolist 项目
RPC is Not Dead: Rise, Fall and the Rise of Remote Procedure Calls
什么样的 RPC 才是好用的 RPC
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服