场景说明
我的应用场景是平台(Java编写)需要往上位机(C++编写)发送数据的上报频率,也就是订阅一个属性的变化,所以使用了gRPC服务端流式RPC来做。
案例相关代码
服务定义
XXXX.proto
service XXXX{
rpc SubscribeProperty (SubscribePropertyRequest) returns (stream SubscribePropertyReply) {}
message SubscribePropertyRequest {
string name = 1;
}
message SubscribePropertyReply {
string name = 1;
string value = 2;
}
}
平台端代码(Java)
XXXXImpl.java
@Component
@Slf4j
public class XXXXImpl extends XXXXImplBase {
@Resource
private PropertyChangeManager propertyChangeManager;
@Override
public void subscribeProperty(SubscribePropertyRequest request, StreamObserver<SubscribePropertyReply> responseObserver) {
propertyChangeManager.putStreamObserver(request.getName(), responseObserver);
}
}
我这里是把对应的StreamObserver
实例缓存起来了,以方便后继平台层面属性变更时,直接通过此实例来发送数据到客户端。
PropertyChangeManager.java
@Slf4j
@Component
public class PropertyChangeManager {
public static final String FIXED_RATE = "fixedRate";
private volatile Map<String, List<StreamObserver<SubscribePropertyReply>>> nameAndObserverMap = Maps.newConcurrentMap();
public synchronized void putStreamObserver(String name, StreamObserver<SubscribePropertyReply> streamObserver){
log.info("putStreamObserver, name={}", name);
List<StreamObserver<SubscribePropertyReply>> streamObservers = nameAndObserverMap.get(name);
if (streamObservers == null) {
streamObservers = Lists.newArrayList();
}
streamObservers.add(streamObserver);
nameAndObserverMap.put(name, streamObservers);
}
public void onChange(String name, String value) {
log.info("onChange, name={}, value={}", name, value);
List<StreamObserver<SubscribePropertyReply>> streamObservers = nameAndObserverMap.get(name);
if (CollectionUtils.isEmpty(streamObservers)) {
return;
}
SubscribePropertyReply subscribePropertyReply = SubscribePropertyReply
.newBuilder()
.setName(name)
.setValue(value)
.build();
for (StreamObserver<SubscribePropertyReply> item : streamObservers) {
try {
//发送数据
item.onNext(subscribePropertyReply);
} catch (Exception e) {
log.error("onNext fail: ", e);
}
}
}
@PreDestroy
public void destroy() {
log.info("destroy");
for (Entry<String, List<StreamObserver<SubscribePropertyReply>>> entry : nameAndObserverMap.entrySet()) {
for (StreamObserver<SubscribePropertyReply> item : entry.getValue()) {
try {
item.onCompleted();
} catch (Exception e) {
log.error("onCompleted fail: ", e);
}
}
}
}
}
上位机端代码(C++,基于QT)
上位机这边是阻塞接收消息的,所以会启动一个线程。
subscribefixedratethread.h
#include "XXXXclient.h"
#include <QThread>
#ifndef SUBSCRIBEFIXEDRATETHREAD_H
#define SUBSCRIBEFIXEDRATETHREAD_H
class SubscribeFixedRateThread : public QThread {
Q_OBJECT
public:
SubscribeFixedRateThread(XXXXClient *client);
void run() override;
void onMessage(int fixedRate);
private:
XXXXClient *xxxxClient;
int currentFixedRate;
signals:
void displayFixedRate(int);
};
#endif // SUBSCRIBEFIXEDRATETHREAD_H
subscribefixedratethread.cpp
#include "subscribefixedratethread.h"
#include <QThread>
using namespace std;
SubscribeFixedRateThread::SubscribeFixedRateThread(XXXXClient *client) {
this->xxxxClient = client;
}
void SubscribeFixedRateThread::run() {
while(true) {
xxxxClient->subscribeProperty("fixedRate", [this](int fixedRate) {
onMessage(fixedRate);
});
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
cout<<"尝试订阅FixedRate的变化"<<endl;
}
}
void SubscribeFixedRateThread::onMessage(int fixedRate) {
cout<<"fixedRate Change Callback: "<<"fixedRate: "<<fixedRate<<", defaultFixedRate: "<<defaultFixedRate<<endl;
if(fixedRate <= 0){
return;
}
if(currentFixedRate != fixedRate) {
currentFixedRate = fixedRate;
emit displayFixedRate(currentFixedRate);
return;
}
}
mainwindow.h
mainwindow.cpp
#include "mainwindow.h"
#include "./ui_mainwindow.h"
#include <QModbusTcpClient>
#include <QTimer>
#include <QThread>
using namespace std;
MainWindow::MainWindow(QWidget *parent)
: QMainWindow(parent), ui(new Ui::MainWindow) {
ui->setupUi(this);
connect(this, &MainWindow::rendered, [=]() {
qDebug() << " the main window has rendered ";
xxxxClient = ...;
subscribeFixedRateThread = new SubscribeFixedRateThread(xxxxClient);
connect(subscribeFixedRateThread, SIGNAL(displayFixedRate(int)), this, SLOT(doWhenFixedRateChange(int)));
subscribeFixedRateThread->start();
});
emit rendered(); //触发
}
MainWindow::~MainWindow() {
delete ui;
delete subscribeFixedRateThread;
}
void MainWindow::doWhenFixedRateChange(int fixedRate) {
// TODO
}
XXXXclient.h
#include <grpcpp/grpcpp.h>
#include "coke_oven_riser.grpc.pb.h"
#include "entity.h"
#ifndef COKEOVENRISEREDGE_COKEOVENRISERCLIENT_H
#define COKEOVENRISEREDGE_COKEOVENRISERCLIENT_H
class XXXXClient{
public:
typedef std::function<void(int)> FixedRateChangeCallback;
CokeOvenRiserClient();
CokeOvenRiserClient(const std::shared_ptr<grpc::Channel>& channel);
bool subscribeProperty(std::string name, FixedRateChangeCallback callback);
private:
std::unique_ptr<CokeOvenRiser::Stub> stub_;
};
#endif //COKEOVENRISEREDGE_COKEOVENRISERCLIENT_H
XXXXclient.cpp
#include "XXXXclient.h"
using grpc::ClientReader;
XXXXClient::XXXXClient() = default;
XXXXClient::XXXXClient(const std::shared_ptr<grpc::Channel> &channel) {
stub_ = XXXX::NewStub(channel);
}
bool XXXXClient::subscribeProperty(std::string name, FixedRateChangeCallback callback) {
grpc::ClientContext context;
SubscribePropertyRequest subscribePropertyRequest;
SubscribePropertyReply subscribePropertyReply;
subscribePropertyRequest.set_name(name);
std::unique_ptr<ClientReader<SubscribePropertyReply>> reader(stub_->SubscribeProperty(&context, subscribePropertyRequest));
while(reader->Read(&subscribePropertyReply)) {
std::cout << name << " : " << subscribePropertyReply.value() << std::endl;
std::string tempVal(subscribePropertyReply.value());
callback((int) std::atoi(tempVal.c_str()));
}
return TRUE;
}