rabbitmq 使用 -凯发k8国际

`

rabbitmq 使用

    博客分类:
  • java

api模块接收请求,推送到消息队列

router模块消费消息,分发到各个模块

每个模块消费消息,在推回api模块,因为api模块需要知道最终执行结果

 

 

api模块配置:

spring:

  cloud:

    stream:

      bindings:

        outbound-agent-state-list.destination: outbound.agent-state-list   #生产

        agent-state-list-reply-channel:                                    #消费回调回来的消息

          destination: outbound.agent-state-list-reply

          group: ${nodeno:0}

          durablesubscription: false

          consumer.maxpriority: 10

      rabbit.bindings:

        outbound-agent-state-list.producer.routing-key-expression: '''router'''

        agent-state-list-reply-channel.consumer.durablesubscription: false

 

@component

public interface outboundoutputchannels {

string outbound_agent_state_list = "outbound-agent-state-list";

 

@output(value = outbound_agent_state_list)

    messagechannel agentstatelistoutput();

}

 

@component

public interface outboundinputchannels {

 

string outbound_agent_state_list_reply = "agent-state-list-reply-channel";

 

@input(value = outbound_agent_state_list_reply)

    subscribablechannel agentstatelistreply();

}

 

接收回调消息并出来业务逻辑

@slf4j

@requiredargsconstructor

@enablebinding(outboundinputchannels.class)

public class outboundreplymonitor extends destroyablemonitor {

 

@streamlistener(outboundinputchannels.outbound_agent_state_list_reply)

    public void agentstatelistreply(agentstatelistreplydto payload) {

        countup();

        agentstateservice.processagentstatelistreply(payload);

        countdown();

    }

}

 

api接口入口

@slf4j

@restcontroller

@requiredargsconstructor

@requestmapping("/api")

@api(value = "外呼接口", tags = "精准智能请求人工及人工接起")

public class agentstatecontroller {

 

    private final agentstateservice agentstateservice;

 

    /**

     * 5.9 精准智能请求人工及人工接起

     * 调用商路通服务,获取所有坐席凯发k8国际首页数据

     *

     * @param agentstaterequestdto 请求参数

     * @return 返回responseserver

     */

    @apioperation(value = "用商路通服务,获取所有坐席凯发k8国际首页数据", httpmethod = "post")

    @postmapping("/agentstatelist")

    public apiresponse agentstatelist(@validated @requestbody agentstatelistrequestdto agentstaterequestdto, @requestheader(value = webconstant.priority, required = false, defaultvalue = "2") integer priority) throws exception {

        log.info("请求:调用商路通服务,获取所有坐席凯发k8国际首页数据,param:{}", agentstaterequestdto.tostring());

        return agentstateservice.agentstatelist(agentstaterequestdto, priority);

    }

 

}

 

api接收请求后,数据校验,发送请求到消息队列,并等待消息的响应

@slf4j

@service

@requiredargsconstructor

public class agentstateservice {

 

    private final messageservice messageservice;

 

    private final callsuppliertyperelationservice callsuppliertyperelationservice;

 

    private final callsupplierservice callsupplierservice;

 

    private final calltypeservice calltypeservice;

 

    protected final applicationproperties properties;

 

    private final map> queryfuturemap = new concurrenthashmap<>();

 

    public apiresponse agentstatelist(agentstatelistrequestdto queue, integer priority) throws exception {

        long starttime = system.currenttimemillis();

 

        string businessid = sltbusinessenum.getnamebycalltype(queue.getcalltype());

        if (stringutils.isempty(businessid)) {

            throw new businessexception("商路通不支持的外呼类型:" queue.getcalltype());

        }

 

        callsupplierpo supplierpo = callsupplierservice.findbycode(queue.getsupplier());

        if (supplierpo == null) {

            throw new businessexception("供应商不存在:" queue.getsupplier());

        }

 

        calltypepo calltypepo = calltypeservice.findbycode(queue.getcalltype());

        if (calltypepo == null) {

            throw new businessexception("外呼类型不存在:" queue.getcalltype());

        }

 

        callsuppliertyperelationpo callsuppliertyperelationpo = callsuppliertyperelationservice.findbysupplieridandcalltypeid(supplierpo.getid(), calltypepo.getid());

        if (null == callsuppliertyperelationpo) {

            throw new businessexception("供应商外呼类型关系不存在");

        }

        queue.setcalltype(calltypepo.getcode());

        queue.setsuppliercode(supplierpo.getcode());

        queue.setqueueid(stringcheckutil.uuid());

        messageservice.sendtoagentstatelistchannel(queue, priority);

        completablefuture future = new completablefuture<>();

        queryfuturemap.put(queue.getqueueid(), future);

 

        long maxwaitmillis = properties.getmaxwaitmillis().tomillis();

        duration duration = properties.getmaxwaitmillis().minusmillis(system.currenttimemillis() - starttime);

        if (duration.isnegative() || duration.iszero()) {

            log.info("调用商路通服务,获取所有坐席凯发k8国际首页数据,已超过最大等待时间{}ms,param:{}", properties.getmaxwaitmillis().tomillis(), queue.tostring());

            throw new requesttimeoutexception(maxwaitmillis, queue.getqueueid());

        }

        log.debug("等待返回调用商路通服务,获取所有坐席凯发k8国际首页数据请求结果,允许等待时间{}ms", duration.tomillis());

        try {

            return future.get(duration.tomillis(), timeunit.milliseconds);

        } catch (timeoutexception e) {

            throw new requesttimeoutexception(maxwaitmillis, queue.getqueueid());

        } finally {

            queryfuturemap.remove(queue.getqueueid());

        }

    }

 

    public void processagentstatelistreply(agentstatelistreplydto payload) {

        string queueid = payload.getqueueid();

        log.info("api模块-接收-调用商路通服务,获取所有坐席凯发k8国际首页数据结果,{}", payload.tostring());

        if (queryfuturemap.containskey(queueid)) {

            queryfuturemap.get(queueid).complete(apiresponse.success(payload.getrows()));

        } else {

            log.debug("未找到queueid记录:{}", queueid);

        }

    }

}

 

 

停止服务前,检查是否有消息正在处理

@slf4j

public class destroyablemonitor {

 

    private static final integer max_wait_count = 20;

 

    private atomiclong messagecount = new atomiclong();

 

    protected long countup() {

        return messagecount.incrementandget();

    }

 

    protected long countdown() {

        return messagecount.decrementandget();

    }

 

    @predestroy

    private void teardown() throws interruptedexception {

        int waitcount = 0;

        while (messagecount.get() > 0 && waitcount < max_wait_count) {

            log.info("正在关闭消息监听程序{},等待3秒[{}/{}]...", this.getclass().getcanonicalname(), waitcount, max_wait_count);

            thread.sleep(3000l);

        }

        if (messagecount.get() > 0) {

            log.warn("应用非安全关闭,当前仍有{}条正在处理的消息", messagecount.get());

        }

    }

}

 

 

 

router模块队列配置:

spring.cloud.stream:

  bindings:

    agent-state-list-input-channel:

      destination: outbound.agent-state-list

      group: router

      consumer:

        maxattempts: 1

        concurrency: 10

    agent-state-list-slt-acv-channel.destination: outbound.agent-state-list

    agent-state-list-slt-ivr-channel.destination: outbound.agent-state-list

  rabbit.bindings:

    agent-state-list-input-channel.consumer.bindingroutingkey: router

    agent-state-list-slt-acv-channel.producer.routing-key-expression: '''slt-acv'''

    agent-state-list-slt-ivr-channel.producer.routing-key-expression: '''slt-ivr'''

 

 

@component

public interface routerinputchannels {

string agent_state_list_input_channel = "agent-state-list-input-channel";

 

@input(value = agent_state_list_input_channel)

    subscribablechannel agentstatelistinput();

}

 

@component

public interface routeroutputchannels {

string agent_state_list_slt_acv = "agent-state-list-slt-acv-channel";

    string agent_state_list_slt_ivr = "agent-state-list-slt-ivr-channel";

 

    @output(agent_state_list_slt_acv)

    messagechannel agentstatelistsltacr();

 

    @output(agent_state_list_slt_ivr)

    messagechannel agentstatelistsltivr();

}

 

router 模块监听

@slf4j

@component

@enablebinding(routerinputchannels.class)

@requiredargsconstructor

public class agentstatuslistmonitor extends destroyablemonitor {

    private final agentstatelistservice agentstatelistservice;

 

    @streamlistener(routerinputchannels.agent_state_list_input_channel)

    public void onmessage(agentstatelistrequestdto req) {

        countup();

        try {

            log.info("agentstatuslistmonitor收到请求,调用商路通服务,获取所有坐席凯发k8国际首页数据:查询参数:{}", req.tostring());

            agentstatelistservice.process(req);

        } finally {

            countdown();

        }

    }

}

 

router业务处理和消息分发

@slf4j

@service

@requiredargsconstructor

public class agentstatelistservice {

 

    private final agentstatelistmessageservice agentstatelistmessageservice;

 

    private final routeroutputchannels routeroutputchannels;

 

    public void process(agentstatelistrequestdto payload) {

        agentstatelistmessageservice.send(payload);

 

        message message = messagebuilder.withpayload(payload).build();

        switch (payload.getsupplier()) {

            case outboundsupplierconstant.slt:

                switch (payload.getcalltype()) {

                    case outboundtypeconstant.ivr:

                        routeroutputchannels.agentstatelistsltivr().send(message);

                        return;

                    case outboundtypeconstant.acv:

                        routeroutputchannels.agentstatelistsltacr().send(message);

                        return;

                    default:

                        routeroutputchannels.agentstatelistsltacr().send(message);

                        routeroutputchannels.agentstatelistsltivr().send(message);

                        return;

                }

            default:

                log.error("调用商路通服务,获取所有坐席凯发k8国际首页数据的消息转发失败,供应商不支持,[payload:{}]", payload);

        }

    }

}

 

分发后的模块

base模块

application-slt.yml

spring.cloud.stream:

  bindings:

    agent-state-list-reply-channel.destination: outbound.agent-state-list-reply

 

@component

public interface sltboundinputchannels {

string agent_state_list_channel = "agent-state-list-channel";

 

@input(value = agent_state_list_channel)

    subscribablechannel agentstatelistsltacvinput();

}

 

@component

public interface sltboundoutputchannels {

string agent_state_list_reply_channel = "agent-state-list-reply-channel";

 

@output(agent_state_list_reply_channel)

    messagechannel agentstatelistreplyoutput();

}

 

base模块的monitor

@slf4j

@component

@requiredargsconstructor

@enablebinding(sltboundinputchannels.class)

public class sltagentinfomonitor extends destroyablemonitor {

 

    private final sltagentstatelistservice sltagentstatelistservice;

 

    @streamlistener(sltboundinputchannels.agent_state_list_channel)

    public void process(message message) {

        log.info("外呼平台:slt,代理模块收到消息,调用商路通服务,获取所有坐席凯发k8国际首页数据, params: {}", message.getpayload());

        countup();

        sltagentstatelistservice.process(message.getpayload());

        countdown();

    }

}

 

service实现了一些slt请求的方法

@slf4j

@service

public class sltagentstatelistservice extends abstractsltrequestservicetemplate {

 

    private final sltboundoutputchannels sltboundoutputchannels;

 

    public sltagentstatelistservice(applicationproperties properties, objectmapper objectmapper, resttemplateservice resttemplateservice, sltboundoutputchannels sltboundoutputchannels) {

        super(properties, objectmapper, resttemplateservice);

        this.sltboundoutputchannels = sltboundoutputchannels;

    }

 

    @override

    protected void handleexception(exception exception, agentstatelistrequestdto payload) {

        log.info("外呼平台:slt,代理模块执行,调用商路通服务,获取所有坐席凯发k8国际首页数据-指令,调用异常, params: {}", payload, exception);

    }

 

    @override

    public requestdto generaterequest(agentstatelistrequestdto payload) {

        log.info("外呼平台:slt,代理模块执行,调用商路通服务,获取所有坐席凯发k8国际首页数据-指令,创建请求,params:{}", payload);

        supplierrequestinfodto requestinfo = properties.getrequestinfo(payload.getsuppliercode());

        requestdto request = new requestdto();

        request.setaction(outbound_agent_state_list_action);

        request.setstarttime(payload.getstarttime());

        request.setendtime(payload.getendtime());

        request.setbusinessid(sltbusinessenum.getnamebycalltype(payload.getcalltype()));

        request.setbase;

        request.set.get(outbound_agent_state_list_servlet)));

        request.setloginuser(requestinfo.getuser());

        request.setloginpwd(requestinfo.getpassword());

        return request;

    }

 

 

    @override

    public void handleresponse(agentstatelistresponsedto responsedto, agentstatelistrequestdto payload) {

        if (sltresponsecode.response_success_str.equals(responsedto.getreturncode())) {

            processreply(responsedto, payload);

        } else {

            log.warn("外呼平台:slt,代理模块执行,调用商路通服务,获取所有坐席凯发k8国际首页数据-处理响应,获取失败,params:{}, result:{}", payload, responsedto);

        }

    }

 

    @override

    protected typereference instancereference() {

        return new typereference<>() {

        };

    }

    

//消息写会api模块

    private void processreply(agentstatelistresponsedto responsedto, agentstatelistrequestdto payload) {

        list rowsjsonobject = responsedto.getrows();

        list rowslist = new arraylist<>();

        for (agentstatelistjsonto e : rowsjsonobject) {

            agentstatelistdto agentstatelistdto = new agentstatelistdto();

            beanutils.copyproperties(e, agentstatelistdto);

            rowslist.add(agentstatelistdto);

        }

        agentstatelistreplydto reply = new agentstatelistreplydto();

        reply.setreturncode(responsedto.getreturncode());

        reply.setmessage(responsedto.getreturnmessage());

        reply.setrows(rowslist);

        reply.setqueueid(payload.getqueueid());

        reply.settotal(responsedto.gettotal());

        sltboundoutputchannels.agentstatelistreplyoutput().send(messagebuilder.withpayload(reply).build());

    }

 

}

 

@slf4j

public abstract class abstractsltrequestservicetemplate extends sltsendrequestservice

        implements iencapsulationrequestentityinterface, iparsingresponsebodyinterface {

 

 

    public abstractsltrequestservicetemplate(applicationproperties properties, objectmapper objectmapper, resttemplateservice resttemplateservice) {

        super(properties, objectmapper, resttemplateservice);

    }

 

    public void process(t payload) {

        try {

            requestdto requestdto = generaterequest(payload);

            k response = sendrequest(requestdto);

            handleresponse(response, payload);

        } catch (exception e) {

            handleexception(e, payload);

        }

    }

 

    /**

     * 处理异常

     *

     * @param exception 异常对象

     * @param payload   传递数据

     */

    protected abstract void handleexception(exception exception, t payload);

}

 

下面几个类是对slt的所有请求的封装

@slf4j

@component

@requiredargsconstructor

public class sltsendrequestservice implements isendrequestinterface {

 

    protected final applicationproperties properties;

    protected final objectmapper objectmapper;

    protected final resttemplateservice resttemplateservice;

 

    threadlocal checklogin = new threadlocal<>();

 

    /**

     * 发送请求

     *

     * @param request 请求参数

     * @return 响应实体

     * @throws exception 异常

     */

    @override

    public t sendrequest(requestdto request) throws exception {

        httpheaders headers = new httpheaders();

        headers.setcontenttype(mediatype.application_form_urlencoded);

        string requestparam = "param=".concat(objectmapper.writevalueasstring(request));

        httpentity requestentity = new httpentity<>(requestparam, headers);

        long starttime = system.currenttimemillis();

        responseentity responseentity = null;

        log.info("请求:[{}],header: {}", requestparam, headers.tostring());

        try {

            responseentity = resttemplateservice.post(request.get, requestentity, properties.getrequesttimeoutmaximum(), string.class);

            assert.hastext(responseentity.getbody(), "接口调用异常:商路通接口响应体为空");

            if (responseentity.getstatuscode().equals(httpstatus.ok)) {

                t response = objectmapper.readvalue(responseentity.getbody(), instancereference());

                return checkresponse(response, request);

            } else {

                log.error("商路通外呼请求:接口调用失败 [statuscode:{},body:{}]", responseentity.getstatuscodevalue(), responseentity.getbody());

                throw new businessexception(commonenum.request_exception.getcode(), "商路通外呼请求:接口调用失败");

            }

        } finally {

            checklogin.remove();

            long elapsedtime = system.currenttimemillis() - starttime;

            if (responseentity == null) {

                log.error("商路通外呼请求详情:[请求地址:[{}],请求体:[{}],响应体:null]", request.get, requestparam);

            } else {

                log.info("商路通外呼请求详情:[请求地址:[{}],请求时间:{},请求体:[{}],响应体:[{}]]",

                        request.get, elapsedtime, requestparam, responseentity.getbody());

            }

        }

    }

 

    protected typereference instancereference() {

        return new typereference() {

        };

    }

 

    /**

     * 校验响应体是否合法

     *

     * @param response 响应体对象

     * @param request  请求体对象

     * @return 响应体对象

     * @throws exception 异常

     */

    t checkresponse(t response, requestdto request) throws exception {

        if (stringutils.equals(sltresponsecode.response_time_out_str, response.getreturncode()) &&

                stringutils.equals("超时", response.getreturnmessage())) {

            log.info("商路通外呼请求:接口请求响应码超时:[{}],调用登录接口", response.tostring());

            return login(request);

        } else {

            return response;

        }

    }

 

    /**

     * 商路通登陆操作

     *

     * @param request 请求参数对象

     * @return 响应体对象

     * @throws exception 异常

     */

    private t login(requestdto request) throws exception {

        checklogin();

        httpheaders headers = new httpheaders();

        headers.setcontenttype(mediatype.application_form_urlencoded);

        string loginparam = "userid=".concat(request.getloginuser()).concat("&md5pwd=").concat(request.getloginpwd());

        httpentity requestentity = new httpentity<>(loginparam, headers);

        responseentity loginresponseentity = resttemplateservice.post(request.getbase.concat(properties.getrequesturlmap().get(outbound_login)), requestentity);

        log.info("商路通外呼请求:登陆接口响应:[body:{}]", loginresponseentity.getbody());

        assert.hastext(loginresponseentity.getbody(), "接口调用异常:商路通登录接口响应体为空");

        loginresponsedto loginresponse = objectmapper.readvalue(loginresponseentity.getbody(), loginresponsedto.class);

        if (sltresponsecode.response_success.equals(loginresponse.getreturncode())) {

            return sendrequest(request);

        } else {

            throw new businessexception(commonenum.request_exception.getcode(),

                    "商路通外呼请求:登录接口调用失败:".concat(objects.requirenonnull(loginresponseentity.getbody())));

        }

    }

 

    /**

     * 校验请求而否二次登陆

     */

    private void checklogin() {

        if (checklogin.get() == null || !checklogin.get()) {

            checklogin.set(true);

        } else {

            throw new businessexception(commonenum.request_exception.getcode(), "操作失败: 商路通登录接口已经调用过");

        }

    }

 

}

 

public interface isendrequestinterface {

 

 

    /**

     * 发送请求

     *

     * @param request 请求实体

     * @return 响应实体

     * @throws exception  异常

     */

    k sendrequest(t request) throws exception;

 

}

 

public interface iencapsulationrequestentityinterface {

 

    /**

     * 初始化请求对象

     *

     * @param payload 消息队列传递请求相关参数信息

     * @return 请求参数

     */

     t generaterequest(q payload) throws exception;

 

}

 

@data

public class baserequestentity {

}

 

 

下面是ivr模块的示例,acv模块省略

配置队列:

spring.cloud.stream:

  bindings:

    agent-state-list-channel:

      destination: outbound.agent-state-list

      group: slt-ivr

      consumer:

        maxattempts: 1

        concurrency: 10

  rabbit.bindings:

    agent-state-list-channel.consumer.bindingroutingkey: slt-ivr

分享到:
评论

相关推荐

    rabbitmq使用手册,介绍了rabbitmq的几种应用场景以及开发指导

    springboot中rabbitmq使用demo,springboot中rabbitmq使用demo

    mq全称为message queue,即消息队列, rabbitmq是由erlang语言开发,基于amqp(advanced message queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用...

    内容为自己总结的rabbitmq的经验,内容大致有:安装顺序、关键文件路径、常用命令、集群注意事项等,有用的请收入

    rabbitmq使用手册 rabbitmq安装。

    两种简单rabbitmq使用方案及其测试

    springboot整合rabbitmq使用死信队列

    rabbitmq下载安装配置使用指南官方手册

    rabbitmq使用参考-ys.pdf

    rabbitmq使用参考-ys

    rabbitmq使用环境【安装包、erlang环境】

    如题,c#的demo项目:rabbitmq封装和使用, 引用了rabbitmq.client 版本:3.6.9 rabbitmq .net客户端操作类库, 并简单展示了3种exchange的使用

    介绍rabbitmq使用的教程

    rabbitmq使用规范

    rabbitmq客户连接池的java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建connection和新建channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...

    关于rabbitmq的交换机使用方式!便于了解rabbitmq的使用!

    rabbitmq连接池 springboot实现。通过连接池实现将高效的管理rabbitmq的connection,并与springboot进行整合,实现消息发送,获取队列列表等功能。基于此可以进行更多功能的扩充。

    javascript连接消息(rabbitmq)

    1:rabbitmq的命名规范 2:rabbitmq生产者开发规范 3:rabbitmq消费者开发规范

global site tag (gtag.js) - google analytics
网站地图