api模块接收请求,推送到消息队列
router模块消费消息,分发到各个模块
每个模块消费消息,在推回api模块,因为api模块需要知道最终执行结果
api模块配置:
spring:
cloud:
stream:
bindings:
outbound-agent-state-list.destination: outbound.agent-state-list #生产
agent-state-list-reply-channel: #消费回调回来的消息
destination: outbound.agent-state-list-reply
group: ${nodeno:0}
durablesubscription: false
consumer.maxpriority: 10
rabbit.bindings:
outbound-agent-state-list.producer.routing-key-expression: '''router'''
agent-state-list-reply-channel.consumer.durablesubscription: false
@component
public interface outboundoutputchannels {
string outbound_agent_state_list = "outbound-agent-state-list";
@output(value = outbound_agent_state_list)
messagechannel agentstatelistoutput();
}
@component
public interface outboundinputchannels {
string outbound_agent_state_list_reply = "agent-state-list-reply-channel";
@input(value = outbound_agent_state_list_reply)
subscribablechannel agentstatelistreply();
}
接收回调消息并出来业务逻辑
@slf4j
@requiredargsconstructor
@enablebinding(outboundinputchannels.class)
public class outboundreplymonitor extends destroyablemonitor {
@streamlistener(outboundinputchannels.outbound_agent_state_list_reply)
public void agentstatelistreply(agentstatelistreplydto payload) {
countup();
agentstateservice.processagentstatelistreply(payload);
countdown();
}
}
api接口入口
@slf4j
@restcontroller
@requiredargsconstructor
@requestmapping("/api")
@api(value = "外呼接口", tags = "精准智能请求人工及人工接起")
public class agentstatecontroller {
private final agentstateservice agentstateservice;
/**
* 5.9 精准智能请求人工及人工接起
* 调用商路通服务,获取所有坐席凯发k8国际首页数据
*
* @param agentstaterequestdto 请求参数
* @return 返回responseserver
*/
@apioperation(value = "用商路通服务,获取所有坐席凯发k8国际首页数据", httpmethod = "post")
@postmapping("/agentstatelist")
public apiresponse agentstatelist(@validated @requestbody agentstatelistrequestdto agentstaterequestdto, @requestheader(value = webconstant.priority, required = false, defaultvalue = "2") integer priority) throws exception {
log.info("请求:调用商路通服务,获取所有坐席凯发k8国际首页数据,param:{}", agentstaterequestdto.tostring());
return agentstateservice.agentstatelist(agentstaterequestdto, priority);
}
}
api接收请求后,数据校验,发送请求到消息队列,并等待消息的响应
@slf4j
@service
@requiredargsconstructor
public class agentstateservice {
private final messageservice messageservice;
private final callsuppliertyperelationservice callsuppliertyperelationservice;
private final callsupplierservice callsupplierservice;
private final calltypeservice calltypeservice;
protected final applicationproperties properties;
private final map
public apiresponse agentstatelist(agentstatelistrequestdto queue, integer priority) throws exception {
long starttime = system.currenttimemillis();
string businessid = sltbusinessenum.getnamebycalltype(queue.getcalltype());
if (stringutils.isempty(businessid)) {
throw new businessexception("商路通不支持的外呼类型:" queue.getcalltype());
}
callsupplierpo supplierpo = callsupplierservice.findbycode(queue.getsupplier());
if (supplierpo == null) {
throw new businessexception("供应商不存在:" queue.getsupplier());
}
calltypepo calltypepo = calltypeservice.findbycode(queue.getcalltype());
if (calltypepo == null) {
throw new businessexception("外呼类型不存在:" queue.getcalltype());
}
callsuppliertyperelationpo callsuppliertyperelationpo = callsuppliertyperelationservice.findbysupplieridandcalltypeid(supplierpo.getid(), calltypepo.getid());
if (null == callsuppliertyperelationpo) {
throw new businessexception("供应商外呼类型关系不存在");
}
queue.setcalltype(calltypepo.getcode());
queue.setsuppliercode(supplierpo.getcode());
queue.setqueueid(stringcheckutil.uuid());
messageservice.sendtoagentstatelistchannel(queue, priority);
completablefuture
queryfuturemap.put(queue.getqueueid(), future);
long maxwaitmillis = properties.getmaxwaitmillis().tomillis();
duration duration = properties.getmaxwaitmillis().minusmillis(system.currenttimemillis() - starttime);
if (duration.isnegative() || duration.iszero()) {
log.info("调用商路通服务,获取所有坐席凯发k8国际首页数据,已超过最大等待时间{}ms,param:{}", properties.getmaxwaitmillis().tomillis(), queue.tostring());
throw new requesttimeoutexception(maxwaitmillis, queue.getqueueid());
}
log.debug("等待返回调用商路通服务,获取所有坐席凯发k8国际首页数据请求结果,允许等待时间{}ms", duration.tomillis());
try {
return future.get(duration.tomillis(), timeunit.milliseconds);
} catch (timeoutexception e) {
throw new requesttimeoutexception(maxwaitmillis, queue.getqueueid());
} finally {
queryfuturemap.remove(queue.getqueueid());
}
}
public void processagentstatelistreply(agentstatelistreplydto payload) {
string queueid = payload.getqueueid();
log.info("api模块-接收-调用商路通服务,获取所有坐席凯发k8国际首页数据结果,{}", payload.tostring());
if (queryfuturemap.containskey(queueid)) {
queryfuturemap.get(queueid).complete(apiresponse.success(payload.getrows()));
} else {
log.debug("未找到queueid记录:{}", queueid);
}
}
}
停止服务前,检查是否有消息正在处理
@slf4j
public class destroyablemonitor {
private static final integer max_wait_count = 20;
private atomiclong messagecount = new atomiclong();
protected long countup() {
return messagecount.incrementandget();
}
protected long countdown() {
return messagecount.decrementandget();
}
@predestroy
private void teardown() throws interruptedexception {
int waitcount = 0;
while (messagecount.get() > 0 && waitcount < max_wait_count) {
log.info("正在关闭消息监听程序{},等待3秒[{}/{}]...", this.getclass().getcanonicalname(), waitcount, max_wait_count);
thread.sleep(3000l);
}
if (messagecount.get() > 0) {
log.warn("应用非安全关闭,当前仍有{}条正在处理的消息", messagecount.get());
}
}
}
router模块队列配置:
spring.cloud.stream:
bindings:
agent-state-list-input-channel:
destination: outbound.agent-state-list
group: router
consumer:
maxattempts: 1
concurrency: 10
agent-state-list-slt-acv-channel.destination: outbound.agent-state-list
agent-state-list-slt-ivr-channel.destination: outbound.agent-state-list
rabbit.bindings:
agent-state-list-input-channel.consumer.bindingroutingkey: router
agent-state-list-slt-acv-channel.producer.routing-key-expression: '''slt-acv'''
agent-state-list-slt-ivr-channel.producer.routing-key-expression: '''slt-ivr'''
@component
public interface routerinputchannels {
string agent_state_list_input_channel = "agent-state-list-input-channel";
@input(value = agent_state_list_input_channel)
subscribablechannel agentstatelistinput();
}
@component
public interface routeroutputchannels {
string agent_state_list_slt_acv = "agent-state-list-slt-acv-channel";
string agent_state_list_slt_ivr = "agent-state-list-slt-ivr-channel";
@output(agent_state_list_slt_acv)
messagechannel agentstatelistsltacr();
@output(agent_state_list_slt_ivr)
messagechannel agentstatelistsltivr();
}
router 模块监听
@slf4j
@component
@enablebinding(routerinputchannels.class)
@requiredargsconstructor
public class agentstatuslistmonitor extends destroyablemonitor {
private final agentstatelistservice agentstatelistservice;
@streamlistener(routerinputchannels.agent_state_list_input_channel)
public void onmessage(agentstatelistrequestdto req) {
countup();
try {
log.info("agentstatuslistmonitor收到请求,调用商路通服务,获取所有坐席凯发k8国际首页数据:查询参数:{}", req.tostring());
agentstatelistservice.process(req);
} finally {
countdown();
}
}
}
router业务处理和消息分发
@slf4j
@service
@requiredargsconstructor
public class agentstatelistservice {
private final agentstatelistmessageservice agentstatelistmessageservice;
private final routeroutputchannels routeroutputchannels;
public void process(agentstatelistrequestdto payload) {
agentstatelistmessageservice.send(payload);
message
switch (payload.getsupplier()) {
case outboundsupplierconstant.slt:
switch (payload.getcalltype()) {
case outboundtypeconstant.ivr:
routeroutputchannels.agentstatelistsltivr().send(message);
return;
case outboundtypeconstant.acv:
routeroutputchannels.agentstatelistsltacr().send(message);
return;
default:
routeroutputchannels.agentstatelistsltacr().send(message);
routeroutputchannels.agentstatelistsltivr().send(message);
return;
}
default:
log.error("调用商路通服务,获取所有坐席凯发k8国际首页数据的消息转发失败,供应商不支持,[payload:{}]", payload);
}
}
}
分发后的模块
base模块
application-slt.yml
spring.cloud.stream:
bindings:
agent-state-list-reply-channel.destination: outbound.agent-state-list-reply
@component
public interface sltboundinputchannels {
string agent_state_list_channel = "agent-state-list-channel";
@input(value = agent_state_list_channel)
subscribablechannel agentstatelistsltacvinput();
}
@component
public interface sltboundoutputchannels {
string agent_state_list_reply_channel = "agent-state-list-reply-channel";
@output(agent_state_list_reply_channel)
messagechannel agentstatelistreplyoutput();
}
base模块的monitor
@slf4j
@component
@requiredargsconstructor
@enablebinding(sltboundinputchannels.class)
public class sltagentinfomonitor extends destroyablemonitor {
private final sltagentstatelistservice sltagentstatelistservice;
@streamlistener(sltboundinputchannels.agent_state_list_channel)
public void process(message
log.info("外呼平台:slt,代理模块收到消息,调用商路通服务,获取所有坐席凯发k8国际首页数据, params: {}", message.getpayload());
countup();
sltagentstatelistservice.process(message.getpayload());
countdown();
}
}
service实现了一些slt请求的方法
@slf4j
@service
public class sltagentstatelistservice extends abstractsltrequestservicetemplate
private final sltboundoutputchannels sltboundoutputchannels;
public sltagentstatelistservice(applicationproperties properties, objectmapper objectmapper, resttemplateservice resttemplateservice, sltboundoutputchannels sltboundoutputchannels) {
super(properties, objectmapper, resttemplateservice);
this.sltboundoutputchannels = sltboundoutputchannels;
}
@override
protected void handleexception(exception exception, agentstatelistrequestdto payload) {
log.info("外呼平台:slt,代理模块执行,调用商路通服务,获取所有坐席凯发k8国际首页数据-指令,调用异常, params: {}", payload, exception);
}
@override
public requestdto generaterequest(agentstatelistrequestdto payload) {
log.info("外呼平台:slt,代理模块执行,调用商路通服务,获取所有坐席凯发k8国际首页数据-指令,创建请求,params:{}", payload);
supplierrequestinfodto requestinfo = properties.getrequestinfo(payload.getsuppliercode());
requestdto request = new requestdto();
request.setaction(outbound_agent_state_list_action);
request.setstarttime(payload.getstarttime());
request.setendtime(payload.getendtime());
request.setbusinessid(sltbusinessenum.getnamebycalltype(payload.getcalltype()));
request.setbase;
request.set.get(outbound_agent_state_list_servlet)));
request.setloginuser(requestinfo.getuser());
request.setloginpwd(requestinfo.getpassword());
return request;
}
@override
public void handleresponse(agentstatelistresponsedto responsedto, agentstatelistrequestdto payload) {
if (sltresponsecode.response_success_str.equals(responsedto.getreturncode())) {
processreply(responsedto, payload);
} else {
log.warn("外呼平台:slt,代理模块执行,调用商路通服务,获取所有坐席凯发k8国际首页数据-处理响应,获取失败,params:{}, result:{}", payload, responsedto);
}
}
@override
protected typereference
return new typereference<>() {
};
}
//消息写会api模块
private void processreply(agentstatelistresponsedto responsedto, agentstatelistrequestdto payload) {
list
list
for (agentstatelistjsonto e : rowsjsonobject) {
agentstatelistdto agentstatelistdto = new agentstatelistdto();
beanutils.copyproperties(e, agentstatelistdto);
rowslist.add(agentstatelistdto);
}
agentstatelistreplydto reply = new agentstatelistreplydto();
reply.setreturncode(responsedto.getreturncode());
reply.setmessage(responsedto.getreturnmessage());
reply.setrows(rowslist);
reply.setqueueid(payload.getqueueid());
reply.settotal(responsedto.gettotal());
sltboundoutputchannels.agentstatelistreplyoutput().send(messagebuilder.withpayload(reply).build());
}
}
@slf4j
public abstract class abstractsltrequestservicetemplate
implements iencapsulationrequestentityinterface
public abstractsltrequestservicetemplate(applicationproperties properties, objectmapper objectmapper, resttemplateservice resttemplateservice) {
super(properties, objectmapper, resttemplateservice);
}
public void process(t payload) {
try {
requestdto requestdto = generaterequest(payload);
k response = sendrequest(requestdto);
handleresponse(response, payload);
} catch (exception e) {
handleexception(e, payload);
}
}
/**
* 处理异常
*
* @param exception 异常对象
* @param payload 传递数据
*/
protected abstract void handleexception(exception exception, t payload);
}
下面几个类是对slt的所有请求的封装
@slf4j
@component
@requiredargsconstructor
public class sltsendrequestservice
protected final applicationproperties properties;
protected final objectmapper objectmapper;
protected final resttemplateservice resttemplateservice;
threadlocal
/**
* 发送请求
*
* @param request 请求参数
* @return 响应实体
* @throws exception 异常
*/
@override
public t sendrequest(requestdto request) throws exception {
httpheaders headers = new httpheaders();
headers.setcontenttype(mediatype.application_form_urlencoded);
string requestparam = "param=".concat(objectmapper.writevalueasstring(request));
httpentity
long starttime = system.currenttimemillis();
responseentity
log.info("请求:[{}],header: {}", requestparam, headers.tostring());
try {
responseentity = resttemplateservice.post(request.get, requestentity, properties.getrequesttimeoutmaximum(), string.class);
assert.hastext(responseentity.getbody(), "接口调用异常:商路通接口响应体为空");
if (responseentity.getstatuscode().equals(httpstatus.ok)) {
t response = objectmapper.readvalue(responseentity.getbody(), instancereference());
return checkresponse(response, request);
} else {
log.error("商路通外呼请求:接口调用失败 [statuscode:{},body:{}]", responseentity.getstatuscodevalue(), responseentity.getbody());
throw new businessexception(commonenum.request_exception.getcode(), "商路通外呼请求:接口调用失败");
}
} finally {
checklogin.remove();
long elapsedtime = system.currenttimemillis() - starttime;
if (responseentity == null) {
log.error("商路通外呼请求详情:[请求地址:[{}],请求体:[{}],响应体:null]", request.get, requestparam);
} else {
log.info("商路通外呼请求详情:[请求地址:[{}],请求时间:{},请求体:[{}],响应体:[{}]]",
request.get, elapsedtime, requestparam, responseentity.getbody());
}
}
}
protected typereference
return new typereference
};
}
/**
* 校验响应体是否合法
*
* @param response 响应体对象
* @param request 请求体对象
* @return 响应体对象
* @throws exception 异常
*/
t checkresponse(t response, requestdto request) throws exception {
if (stringutils.equals(sltresponsecode.response_time_out_str, response.getreturncode()) &&
stringutils.equals("超时", response.getreturnmessage())) {
log.info("商路通外呼请求:接口请求响应码超时:[{}],调用登录接口", response.tostring());
return login(request);
} else {
return response;
}
}
/**
* 商路通登陆操作
*
* @param request 请求参数对象
* @return 响应体对象
* @throws exception 异常
*/
private t login(requestdto request) throws exception {
checklogin();
httpheaders headers = new httpheaders();
headers.setcontenttype(mediatype.application_form_urlencoded);
string loginparam = "userid=".concat(request.getloginuser()).concat("&md5pwd=").concat(request.getloginpwd());
httpentity
responseentity
log.info("商路通外呼请求:登陆接口响应:[body:{}]", loginresponseentity.getbody());
assert.hastext(loginresponseentity.getbody(), "接口调用异常:商路通登录接口响应体为空");
loginresponsedto loginresponse = objectmapper.readvalue(loginresponseentity.getbody(), loginresponsedto.class);
if (sltresponsecode.response_success.equals(loginresponse.getreturncode())) {
return sendrequest(request);
} else {
throw new businessexception(commonenum.request_exception.getcode(),
"商路通外呼请求:登录接口调用失败:".concat(objects.requirenonnull(loginresponseentity.getbody())));
}
}
/**
* 校验请求而否二次登陆
*/
private void checklogin() {
if (checklogin.get() == null || !checklogin.get()) {
checklogin.set(true);
} else {
throw new businessexception(commonenum.request_exception.getcode(), "操作失败: 商路通登录接口已经调用过");
}
}
}
public interface isendrequestinterface
/**
* 发送请求
*
* @param request 请求实体
* @return 响应实体
* @throws exception 异常
*/
k sendrequest(t request) throws exception;
}
public interface iencapsulationrequestentityinterface {
/**
* 初始化请求对象
*
* @param payload 消息队列传递请求相关参数信息
* @return 请求参数
*/
t generaterequest(q payload) throws exception;
}
@data
public class baserequestentity {
}
下面是ivr模块的示例,acv模块省略
配置队列:
spring.cloud.stream:
bindings:
agent-state-list-channel:
destination: outbound.agent-state-list
group: slt-ivr
consumer:
maxattempts: 1
concurrency: 10
rabbit.bindings:
agent-state-list-channel.consumer.bindingroutingkey: slt-ivr
相关推荐
rabbitmq使用手册,介绍了rabbitmq的几种应用场景以及开发指导
springboot中rabbitmq使用demo,springboot中rabbitmq使用demo
mq全称为message queue,即消息队列, rabbitmq是由erlang语言开发,基于amqp(advanced message queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用...
内容为自己总结的rabbitmq的经验,内容大致有:安装顺序、关键文件路径、常用命令、集群注意事项等,有用的请收入
rabbitmq使用手册 rabbitmq安装。
两种简单rabbitmq使用方案及其测试
springboot整合rabbitmq使用死信队列
rabbitmq下载安装配置使用指南官方手册
rabbitmq使用参考-ys.pdf
rabbitmq使用参考-ys
rabbitmq使用环境【安装包、erlang环境】
如题,c#的demo项目:rabbitmq封装和使用, 引用了rabbitmq.client 版本:3.6.9 rabbitmq .net客户端操作类库, 并简单展示了3种exchange的使用
介绍rabbitmq使用的教程
rabbitmq使用规范
rabbitmq客户连接池的java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建connection和新建channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...
关于rabbitmq的交换机使用方式!便于了解rabbitmq的使用!
rabbitmq连接池 springboot实现。通过连接池实现将高效的管理rabbitmq的connection,并与springboot进行整合,实现消息发送,获取队列列表等功能。基于此可以进行更多功能的扩充。
javascript连接消息(rabbitmq)
1:rabbitmq的命名规范 2:rabbitmq生产者开发规范 3:rabbitmq消费者开发规范