文章
问答
冒泡
gRPC服务端流式RPC基于C++的应用

场景说明

我的应用场景是平台(Java编写)需要往上位机(C++编写)发送数据的上报频率,也就是订阅一个属性的变化,所以使用了gRPC服务端流式RPC来做。

案例相关代码

服务定义

XXXX.proto

service XXXX{
    rpc SubscribeProperty (SubscribePropertyRequest) returns (stream SubscribePropertyReply) {}

    message SubscribePropertyRequest {
        string name = 1;
    }

    message SubscribePropertyReply {
        string name = 1;
        string value = 2;
    }
}

平台端代码(Java)

XXXXImpl.java

@Component
@Slf4j
public class XXXXImpl extends XXXXImplBase {
    @Resource
    private PropertyChangeManager propertyChangeManager;

    @Override
    public void subscribeProperty(SubscribePropertyRequest request, StreamObserver<SubscribePropertyReply> responseObserver) {
        propertyChangeManager.putStreamObserver(request.getName(), responseObserver);
    }
}

我这里是把对应的StreamObserver实例缓存起来了,以方便后继平台层面属性变更时,直接通过此实例来发送数据到客户端。

PropertyChangeManager.java

@Slf4j
@Component
public class PropertyChangeManager {
    public static final String FIXED_RATE = "fixedRate";

    private volatile Map<String, List<StreamObserver<SubscribePropertyReply>>> nameAndObserverMap = Maps.newConcurrentMap();

    public synchronized void putStreamObserver(String name, StreamObserver<SubscribePropertyReply> streamObserver){
        log.info("putStreamObserver, name={}", name);
        List<StreamObserver<SubscribePropertyReply>> streamObservers = nameAndObserverMap.get(name);
        if (streamObservers == null) {
            streamObservers = Lists.newArrayList();
        }

        streamObservers.add(streamObserver);

        nameAndObserverMap.put(name, streamObservers);
    }

    public void onChange(String name, String value) {
        log.info("onChange, name={}, value={}", name, value);
        List<StreamObserver<SubscribePropertyReply>> streamObservers = nameAndObserverMap.get(name);
        if (CollectionUtils.isEmpty(streamObservers)) {
            return;
        }

        SubscribePropertyReply subscribePropertyReply = SubscribePropertyReply
                .newBuilder()
                .setName(name)
                .setValue(value)
                .build();

        for (StreamObserver<SubscribePropertyReply> item : streamObservers) {
            try {
                //发送数据
                item.onNext(subscribePropertyReply);
            } catch (Exception e) {
                log.error("onNext fail: ", e);
            }
        }
    }

    @PreDestroy
    public void destroy() {
        log.info("destroy");
        for (Entry<String, List<StreamObserver<SubscribePropertyReply>>> entry : nameAndObserverMap.entrySet()) {
            for (StreamObserver<SubscribePropertyReply> item : entry.getValue()) {
                try {
                    item.onCompleted();
                } catch (Exception e) {
                    log.error("onCompleted fail: ", e);
                }
            }
        }
    }
}

上位机端代码(C++,基于QT)

上位机这边是阻塞接收消息的,所以会启动一个线程。

subscribefixedratethread.h

#include "XXXXclient.h"
#include <QThread>

#ifndef SUBSCRIBEFIXEDRATETHREAD_H
#define SUBSCRIBEFIXEDRATETHREAD_H

class SubscribeFixedRateThread : public QThread {
    Q_OBJECT
public:
    SubscribeFixedRateThread(XXXXClient *client);
    void run() override;
    void onMessage(int fixedRate);
private:
    XXXXClient *xxxxClient;
    int currentFixedRate;
signals:
    void displayFixedRate(int);
};
#endif // SUBSCRIBEFIXEDRATETHREAD_H

subscribefixedratethread.cpp

#include "subscribefixedratethread.h"
#include <QThread>

using namespace std;


SubscribeFixedRateThread::SubscribeFixedRateThread(XXXXClient *client) {
    this->xxxxClient = client;
}

void SubscribeFixedRateThread::run() {
    while(true) {
        xxxxClient->subscribeProperty("fixedRate", [this](int fixedRate) {
            onMessage(fixedRate);
        });

        std::this_thread::sleep_for(std::chrono::milliseconds(5000));
        cout<<"尝试订阅FixedRate的变化"<<endl;
    }
}

void SubscribeFixedRateThread::onMessage(int fixedRate) {
    cout<<"fixedRate Change Callback: "<<"fixedRate: "<<fixedRate<<", defaultFixedRate: "<<defaultFixedRate<<endl;

    if(fixedRate <= 0){
        return;
    }

    if(currentFixedRate != fixedRate) {
        currentFixedRate = fixedRate;
        emit displayFixedRate(currentFixedRate);
        return;
    }
}

mainwindow.h

mainwindow.cpp

#include "mainwindow.h"
#include "./ui_mainwindow.h"

#include <QModbusTcpClient>
#include <QTimer>
#include <QThread>

using namespace std;

MainWindow::MainWindow(QWidget *parent)
        : QMainWindow(parent), ui(new Ui::MainWindow) {
    ui->setupUi(this);
    connect(this, &MainWindow::rendered, [=]() {
        qDebug() << " the main window has rendered ";

        xxxxClient = ...;

        subscribeFixedRateThread = new SubscribeFixedRateThread(xxxxClient);
        connect(subscribeFixedRateThread, SIGNAL(displayFixedRate(int)), this, SLOT(doWhenFixedRateChange(int)));
        subscribeFixedRateThread->start();
    });
    emit rendered(); //触发
}

MainWindow::~MainWindow() {
    delete ui;
    delete subscribeFixedRateThread;
}

void MainWindow::doWhenFixedRateChange(int fixedRate) {
    // TODO
}

XXXXclient.h

#include <grpcpp/grpcpp.h>
#include "coke_oven_riser.grpc.pb.h"
#include "entity.h"

#ifndef COKEOVENRISEREDGE_COKEOVENRISERCLIENT_H
#define COKEOVENRISEREDGE_COKEOVENRISERCLIENT_H

class XXXXClient{
public:
    typedef std::function<void(int)> FixedRateChangeCallback;
    CokeOvenRiserClient();
    CokeOvenRiserClient(const std::shared_ptr<grpc::Channel>& channel);
    bool subscribeProperty(std::string name, FixedRateChangeCallback callback);
private:
    std::unique_ptr<CokeOvenRiser::Stub> stub_;
};
#endif //COKEOVENRISEREDGE_COKEOVENRISERCLIENT_H

XXXXclient.cpp

#include "XXXXclient.h"
using grpc::ClientReader;

XXXXClient::XXXXClient() = default;

XXXXClient::XXXXClient(const std::shared_ptr<grpc::Channel> &channel) {
    stub_ = XXXX::NewStub(channel);
}

bool XXXXClient::subscribeProperty(std::string name, FixedRateChangeCallback callback) {
    grpc::ClientContext context;
    SubscribePropertyRequest subscribePropertyRequest;
    SubscribePropertyReply subscribePropertyReply;

    subscribePropertyRequest.set_name(name);

    std::unique_ptr<ClientReader<SubscribePropertyReply>> reader(stub_->SubscribeProperty(&context, subscribePropertyRequest));
    while(reader->Read(&subscribePropertyReply)) {
        std::cout << name << " : " << subscribePropertyReply.value() << std::endl;
        std::string tempVal(subscribePropertyReply.value());
        callback((int) std::atoi(tempVal.c_str()));
    }

    return TRUE;
}
java
c++
gRPC

关于作者

justin
123456
获得点赞
文章被阅读