`
ssydxa219
  • 浏览: 601223 次
  • 性别:
  • 来自: 杭州
博主相关
  • 博客
  • 微博
  • 相册
  • 收藏
  • 文章分类
    社区版块
    • ( 0)
    • ( 0)
    • ( 0)
    存档分类
    最新评论

    es7.16.2 dataoutput

      博客分类:
    • es
     
    package com.wugui.eswr;

    import cn.hutool.core.io.fileutil;
    import com.alibaba.fastjson.jsonobject;
    import com.google.gson.gson;
    import org.apache.http.httphost;
    import org.apache.http.auth.authscope;
    import org.apache.http.auth.usernamepasswordcredentials;
    import org.apache.http.client.credentialsprovider;
    import org.apache.http.impl.client.basiccredentialsprovider;
    import org.apache.http.impl.nio.client.httpasyncclientbuilder;
    import org.apache.lucene.search.totalhits;
    import org.elasticsearch.action.get.multigetrequest;
    import org.elasticsearch.action.search.*;
    import org.elasticsearch.client.*;
    import org.elasticsearch.client.indices.createindexrequest;
    import org.elasticsearch.client.indices.createindexresponse;
    import org.elasticsearch.client.indices.getindexrequest;
    import org.elasticsearch.client.indices.getindexresponse;
    import org.elasticsearch.client.transport.transportclient;
    import org.elasticsearch.common.settings.settings;
    import org.elasticsearch.common.unit.fuzziness;
    import org.elasticsearch.common.unit.timevalue;
    import org.elasticsearch.index.query.*;
    import org.elasticsearch.search.scroll;
    import org.elasticsearch.search.searchhit;
    import org.elasticsearch.search.searchhits;
    import org.elasticsearch.search.aggregations.aggregationbuilder;
    import org.elasticsearch.search.aggregations.aggregationbuilders;
    import org.elasticsearch.search.builder.searchsourcebuilder;
    import org.elasticsearch.search.fetch.subphase.highlight.highlightbuilder;
    import org.elasticsearch.search.sort.sortorder;
    import org.jetbrains.annotations.notnull;

    import java.io.file;
    import java.io.ioexception;
    import java.net.inetaddress;
    import java.util.arraylist;
    import java.util.hashmap;
    import java.util.list;

    public class main {

        private static string scheme = "http";
        private static string host = "202.107.24.16";
        private static int port = 9201;
        private static string username = "elas918273";
        private static string password = "elastic#@!123";

        private static string auth = "authorization type : basic auth";

        private static string _index = ".security-7";

        private static string _type = "_doc";

        public static resthighlevelclient esclient = null;

        private static requestoptions common_options = null;


        public static resthighlevelclient esclientinit() {
            //不需要用户名和密码的认证
            //esclient = new resthighlevelclient(restclient.builder(new httphost(hostname, port, scheme)));
            //需要用户名和密码的认证
            final credentialsprovider credentialsprovider = new basiccredentialsprovider();
            credentialsprovider.setcredentials(authscope.any, new usernamepasswordcredentials(username, password));
            restclientbuilder restclientbuilder = restclient.builder(new httphost(host, port, scheme))
                    .sethttpclientconfigcallback(new restclientbuilder.httpclientconfigcallback() {
                        @override
                        public httpasyncclientbuilder customizehttpclient(httpasyncclientbuilder httpasyncclientbuilder) {
                            return httpasyncclientbuilder.setdefaultcredentialsprovider(credentialsprovider);
                        }
                    });
            esclient = new resthighlevelclient(restclientbuilder);
            requestoptions.builder builder = requestoptions.default.tobuilder();
            builder.sethttpasyncresponseconsumerfactory(
                    // 设置查询内容大小限制,默认100 * 1024 * 1024
                    new httpasyncresponseconsumerfactory.heapbufferedresponseconsumerfactory(200 * 1024 * 1024)
            );
            common_options = builder.build();

            return esclient;
        }

        public static void main(string[] args) throws exception {

            //queryallbyhits(esclientinit());

            queryallbyscolldata(esclientinit());
        }

        /**
         * 创建索引
         *
         * @param esclient
         * @throws exception
         */
        public static void indexcreate(resthighlevelclient esclient) throws exception {

            //创建索引
            createindexrequest request = new createindexrequest(".securityxxx-7");
            createindexresponse createindexresponse = esclient.indices().create(request, requestoptions.default);

            //拿到响应状态
            boolean acknowledged = createindexresponse.isacknowledged();
            system.out.println("索引操作: " acknowledged);

            //关闭客户端连接
            esclient.close();
        }

        /**
         * 获取索引信息
         *
         * @param esclient
         * @throws exception
         */
        public static void indexquery(resthighlevelclient esclient) throws exception {
            //创建索引连接
            getindexrequest request = new getindexrequest(".security-7");
            getindexresponse getindexresponse = esclient.indices().get(request, requestoptions.default);

            system.out.println("索引查询: " getindexresponse.getaliases());
            system.out.println("索引查询: " getindexresponse.getmappings());
            system.out.println("索引查询: " getindexresponse.getsettings());

            //关闭客户端连接
            esclient.close();
        }

        /**
         * @param esclient
         * @throws exception
         */
        private static void queryall(resthighlevelclient esclient) throws exception {
            //查询索引中的全部数据
            searchrequest request = new searchrequest();
            request.indices(".security-7");

            //查询全部 matchall
            request.source(new searchsourcebuilder().query(querybuilders.matchallquery()));

            searchresponse response = esclient.search(request, requestoptions.default);
            searchhits hits = response.gethits();

            system.out.println(hits.gettotalhits());
            system.out.println(response.gettook());

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 条件查询
         *
         * @param esclient
         * @throws exception
         */
        private static void querycondition(resthighlevelclient esclient) throws exception {
            //查询索引中的全部数据
            searchrequest request = new searchrequest();
            request.indices("user");

            //分词中 同时包含李强才可以,因为底层默认使用英文分词器,所以李强会被分为 李,强没办法符合条件所以查询不到
            //request.source(new searchsourcebuilder().query(querybuilders.termquery("name","李强")));
            //这样是李强字段不可再分,查询name字段中包含此字段的数据
            request.source(new searchsourcebuilder().query(querybuilders.matchphrasequery("name", "李强")));


            searchresponse response = esclient.search(request, requestoptions.default);
            searchhits hits = response.gethits();

            system.out.println(hits.gettotalhits());
            system.out.println(response.gettook());

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 分页查询、部分字段显示、排序
         *
         * @param esclient
         * @throws exception
         */
        private static void querypage(resthighlevelclient esclient) throws exception {
            searchrequest request = new searchrequest();
            request.indices(".security-7");

            //构造查询条件 "match_all", "{}"
            searchsourcebuilder query = new searchsourcebuilder().query(querybuilders.matchallquery());

            //分页
            query.from(0); //从第几条数据开始截取 (页码-1)* 每页数量
            query.size(2); //每页的数量

            string[] excludes = {"name"}; //排除字段
            string[] includes = {"tel"}; //展示字段

            query.fetchsource(includes, excludes);

            //排序
            query.sort("create_time", sortorder.asc);

            request.source(query);

            searchresponse response = esclient.search(request, requestoptions.default);
            searchhits hits = response.gethits();

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 组合条件查询
         *
         * @param esclient
         * @throws exception
         */
        private static void queryconditions(resthighlevelclient esclient) throws exception {
            searchrequest request = new searchrequest();
            request.indices("user");

            //构造查询条件
            searchsourcebuilder builder = new searchsourcebuilder();
            boolquerybuilder boolquerybuilder = querybuilders.boolquery();

            //组合条件查询
            //与
    //        boolquerybuilder.must(querybuilders.matchquery("name","小"));
    //        boolquerybuilder.must(querybuilders.matchquery("sex","性别1"));
    //        boolquerybuilder.mustnot(querybuilders.matchquery("sex","性别1"));

            //或
            boolquerybuilder.should(querybuilders.matchquery("name", "华"));
            boolquerybuilder.should(querybuilders.matchquery("sex", "男"));

            //排序
            builder.sort("tel", sortorder.asc);

            builder.query(boolquerybuilder);
            request.source(builder);

            searchresponse response = esclient.search(request, requestoptions.default);
            searchhits hits = response.gethits();

            system.out.println(hits.gettotalhits());
            system.out.println(response.gettook());

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 范围查询
         *
         * @param esclient
         * @throws exception
         */
        private static void querybetween(resthighlevelclient esclient) throws exception {
            searchrequest request = new searchrequest();
            request.indices("user");

            //构造查询条件
            searchsourcebuilder builder = new searchsourcebuilder();
            rangequerybuilder rangequerybuilder = querybuilders.rangequery("age");
            rangequerybuilder.gte(30);
            rangequerybuilder.lte(32);

            //排序
            builder.sort("tel", sortorder.asc);

            builder.query(rangequerybuilder);
            request.source(builder);

            searchresponse response = esclient.search(request, requestoptions.default);
            searchhits hits = response.gethits();

            system.out.println(hits.gettotalhits());
            system.out.println(response.gettook());

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 模糊查询(不针对中文)
         *
         * @param esclient
         * @throws exception
         */
        private static void querylike(resthighlevelclient esclient) throws exception {
            searchrequest request = new searchrequest();
            request.indices("user");

            //构造查询条件
            searchsourcebuilder builder = new searchsourcebuilder();
            builder.query(querybuilders.fuzzyquery("name", "zhangsan").fuzziness(fuzziness.one));

            request.source(builder);

            searchresponse response = esclient.search(request, requestoptions.default);
            searchhits hits = response.gethits();

            system.out.println(hits.gettotalhits());
            system.out.println(response.gettook());

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 高亮查询
         *
         * @param esclient
         * @throws exception
         */
        private static void queryhighlighter(resthighlevelclient esclient) throws exception {
            searchrequest request = new searchrequest();
            request.indices("user");

            //构造查询条件
            searchsourcebuilder builder = new searchsourcebuilder();
            termquerybuilder termquerybuilder = querybuilders.termquery("name", "zhang");

            highlightbuilder highlightbuilder = new highlightbuilder();
            highlightbuilder.pretags("

    es7.16.2 dataoutput -凯发k8国际

    ");
            highlightbuilder.field("name");

            builder.highlighter(highlightbuilder);
            builder.query(termquerybuilder);

            request.source(builder);

            searchresponse response = esclient.search(request, requestoptions.default);
            searchhits hits = response.gethits();

            system.out.println(hits.gettotalhits());
            system.out.println(response.gettook());

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 聚合查询(最大值)
         *
         * @param esclient
         * @throws exception
         */
        private static void queryaggregation(resthighlevelclient esclient) throws exception {
            searchrequest request = new searchrequest();
            request.indices("user");

            //构造查询条件
            searchsourcebuilder builder = new searchsourcebuilder();
    //分组名称,聚合字段
            aggregationbuilder aggregationbuilder = aggregationbuilders.max("maxage").field("age");

            builder.aggregation(aggregationbuilder);
            request.source(builder);

            searchresponse response = esclient.search(request, requestoptions.default);
            system.out.println(response);
            searchhits hits = response.gethits();

            system.out.println(hits.gettotalhits());
            system.out.println(response.gettook());

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 聚合查询(group分组)
         *
         * @param esclient
         * @throws exception
         */
        private static void querygroup(resthighlevelclient esclient) throws exception {
            searchrequest request = new searchrequest();
            request.indices("user");

            //构造查询条件
            searchsourcebuilder builder = new searchsourcebuilder();

            aggregationbuilder aggregationbuilder = aggregationbuilders.terms("agegroup").field("age");

            builder.aggregation(aggregationbuilder);
            request.source(builder);

            searchresponse response = esclient.search(request, requestoptions.default);
            system.out.println(response);
            searchhits hits = response.gethits();

            system.out.println(hits.gettotalhits());
            system.out.println(response.gettook());

            for (searchhit hit : hits.gethits()) {
                system.out.println(hit.getsourceasstring());
            }
        }

        /**
         * 查询全部
         */
        private static void queryallbyhits(resthighlevelclient esclient) throws ioexception {
            gson gson = new gson();
            // 创建搜索对象
            searchrequest searchrequest = new searchrequest();
            // 查询构建工具
            searchsourcebuilder searchsourcebuilder = new searchsourcebuilder();
            //排序
            //no mapping found for [create_time] in order to sort on
            //searchsourcebuilder.sort("create_time", sortorder.asc);
            // 添加查询条件,通过querybuilders获取各种查询
            searchsourcebuilder.query(querybuilders.matchallquery());//"match_all", "{}"
            searchrequest.source(searchsourcebuilder);
            // 搜索
            searchresponse search = esclient.search(searchrequest, requestoptions.default);

            // 解析
            searchhits hits = search.gethits();
            searchhit[] hits1 = hits.gethits();
            for (searchhit hit : hits1) {
                // 取出source数据
                string itemstring = hit.getsourceasstring();
                // 反序列化
                hashmap item = gson.fromjson(itemstring, hashmap.class);
                system.out.println(item);
            }
        }

        private static void queryallbyscoll1(resthighlevelclient esclient) throws ioexception {
            try {
                long starttime = system.currenttimemillis();
                /*创建客户端*/
                //client startup
                //设置集群名称
                settings settings = settings.builder()
                        .put("cluster.name", "elsearch")
                        .put("client.transport.sniff", true)
                        .build();
                //创建client
                resthighlevelclient client = esclient;

                list result = new arraylist<>();

                string scrollid = "";

                //第一次请求
                searchsourcebuilder sourcebuilder = new searchsourcebuilder();


                //todo: 设置查询条件
                rangequerybuilder rangequerybuilder = querybuilders
                        .rangequery("inputtime")
                        .from("2016-12-14 02:00:00").to("2016-12-14 07:59:59");
                sourcebuilder.query(querybuilders.boolquery()
                                .must(querybuilders
                                        .matchphrasequery("pointid", "w3.unit1.10hfc01ct013"))
                                .must(rangequerybuilder))
                        .size(100)//如果开启游标,则滚动获取
                        .sort("inputtime", sortorder.asc);
                //查询
                searchrequest request = requests.searchrequest("pointdata");
                request.scroll("2m");
                request.source(sourcebuilder);
                searchresponse response = client.search(request,null);//.actionget();
                //todo:处理数据
                searchhits hits = response.gethits();
                for (int i = 0; i < hits.gethits().length; i ) {
                    //system.out.println(hits.gethits()[i].getsourceasstring());
                    result.add(hits.gethits()[i].getsourceasstring());
                }
                //记录滚动id
                scrollid = response.getscrollid();


                while (true) {
                    //后续的请求
                    //scrollid = query.getscollid();
                    searchscrollrequestbuilder searchscrollrequestbuilder = null;//client.preparesearchscroll(scrollid);
                    // 重新设定滚动时间
                    //timevalue timevalue = new timevalue(30000);
                    searchscrollrequestbuilder.setscroll("2m");
                    // 请求
                    searchresponse response1 = searchscrollrequestbuilder.get();

                    //todo:处理数据
                    searchhits hits2 = response1.gethits();
                    if (hits2.gethits().length == 0) {
                        break;
                    }
                    for (int i = 0; i < hits2.gethits().length; i ) {
                        result.add(hits2.gethits()[i].getsourceasstring());
                    }
                    //下一批处理
                    scrollid = response1.getscrollid();
                }

                system.out.println(result.size());
                long endtime = system.currenttimemillis();
                system.out.println("java程序运行时间:" (endtime - starttime) "ms");
            } catch (exception e) {
                e.printstacktrace();
            }
        }

        private static void queryallbyscoll(resthighlevelclient esclient) throws ioexception {

            boolquerybuilder boolquery = querybuilders.boolquery();
            boolquery.should(querybuilders.termquery("match_all","{}"));
            // 存活时间,当索引数据量特别大时,出现超时可能性大,此值适当调大
            scroll scroll = new scroll(timevalue.timevalueminutes(10l));
            searchsourcebuilder searchsourcebuilder = new searchsourcebuilder();
            searchsourcebuilder.query(boolquery);
            searchsourcebuilder.size(5);
            searchrequest searchrequest = new searchrequest()
            // es7已经去掉type,查询时加type
                    .indices(_index)
                    .scroll(scroll)
                    .source(searchsourcebuilder);
            searchresponse searchresponse = null;
            try{
                searchresponse = esclient.search(searchrequest, requestoptions.default);
            } catch(
            ioexception e) {
                e.printstacktrace();
            }
            string scrollid = searchresponse.getscrollid();
            searchhit[] searchhits = searchresponse.gethits().gethits();
            for(searchhit searchhit :searchhits){
                    system.out.println(searchhit.getsourceasstring());
                }
            //遍历搜索命中的数据,直到没有数据
            while(searchhits !=null&&searchhits.length >0) {

                    searchscrollrequest scrollrequest = new searchscrollrequest(scrollid);

                    scrollrequest.scroll(scroll);

                    try {
                        searchresponse = esclient.scroll(scrollrequest, requestoptions.default);
                    } catch (ioexception e) {
                        e.printstacktrace();
                    }

                    scrollid = searchresponse.getscrollid();
                    searchhits = searchresponse.gethits().gethits();
                    if (searchhits != null && searchhits.length > 0) {
                        for (searchhit searchhit : searchhits) {
                            system.out.println(searchhit.getsourceasstring());
                        }

                    }

                }
            //clean scroll
                clearscrollrequest clearscrollrequest = new clearscrollrequest();
                clearscrollrequest.addscrollid(scrollid);
                clearscrollresponse clearscrollresponse = null;
            try{
                    clearscrollresponse = esclient.clearscroll(clearscrollrequest, requestoptions.default);
                } catch(
                ioexception e) {
                    //log.error("clear-scroll-error:{}", e);
                }
                boolean succeeded = clearscrollresponse.issucceeded();
            system.out.println(succeeded);
        }

        public static void queryallbyscolldata(resthighlevelclient esclient) throws ioexception {
            int i = 1,size = 2000,total = 43316300;

            for(i=909 ; i            searchrequest searchrequest = new searchrequest();
                //scroll scroll = new scroll(timevalue.timevalueminutes(5l));
                //searchrequest.scroll(scroll);
                searchsourcebuilder searchsourcebuilder = new searchsourcebuilder();

                matchallquerybuilder matchallquerybuilder = querybuilders.matchallquery();
                searchsourcebuilder.query(matchallquerybuilder);
                searchsourcebuilder.from(i*size);
                searchsourcebuilder.size(size);

                searchrequest.source(searchsourcebuilder);
                searchresponse response = esclient.search(searchrequest, common_options);
                //string scrollid = response.getscrollid();
                searchhit[] searchhits = response.gethits().gethits();

                //system.out.println(response.gethits().gettotalhits());
                for (searchhit searchhit : searchhits) {
                    //system.out.println(searchhit.getsourceasstring());
                    fileutil.appendstring(searchhit.getsourceasstring() "\n",new file("d:\\aa\\" i*size ".csv"),"utf-8");
                }
                /*while (searchhits != null && searchhits.length > 0) {
                    searchscrollrequest scrollrequest = new searchscrollrequest(scrollid);
                    scrollrequest.scroll(scroll);
                    response = esclient.scroll(scrollrequest, requestoptions.default);
                    scrollid = response.getscrollid();
                    searchhits = response.gethits().gethits();

                    for (searchhit searchhit : searchhits) {
                        i ;
                        system.out.println(searchhit.getsourceasstring());

                    }
                    if (i > 10) {
                        break;
                    }
                }*/
            }
        }



    }
    分享到:
    评论

    相关推荐

      elasticsearch-ik-7.16.2 分词器

      最新版windows elasticsearch-7.16.2-windows-x86_64.zip

      elasticsearch(elasticsearch-7.16.2-linux-x86_64.tar.gz适用于linux x86_64)是一个高度可扩展的开源全文本搜索和分析引擎。它使您可以快速,近乎实时地存储,搜索和分析大量数据。它通常用作支持具有复杂搜索...

      最新版windows elasticsearch-7.16.3-windows-x86_64.zip

      最新版linux elasticsearch-7.16.1-linux-x86_64.tar.gz

      最新版windows kibana-7.16.2-windows-x86_64.zip

      最新版linux elasticsearch-7.16.2-linux-x86_64.tar.gz

      最新版linux logstash-7.16.2-linux-x86_64.tar.gz

      最新版windows kibana-7.16.3-windows-x86_64.zip

      最新版linux elasticsearch-7.16.3-linux-x86_64.tar.gz

      最新版linux logstash-7.16.1-linux-x86_64.tar.gz

      最新版linux logstash-7.16.3-linux-x86_64.tar.gz

      elasticsearch(elasticsearch-7.16.2-aarch64.rpm适用于linux arm x86_64)是一个高度可扩展的开源全文本搜索和分析引擎。它使您可以快速,近乎实时地存储,搜索和分析大量数据。它通常用作支持具有复杂搜索功能和...

      包含elasticsearch-7.16.3-linux-x86_64.tar.gz,elasticsearch-head.zip,node-v12.18.1-linux-x64.tar.xz三个安装包,head包已经做了内部修改可以解压直接用

      elasticsearch(elasticsearch-7.16.2-x86_64.rpm适用于linux x86_64)是一个高度可扩展的开源全文本搜索和分析引擎。它使您可以快速,近乎实时地存储,搜索和分析大量数据。它通常用作支持具有复杂搜索功能和要求的...

      最新版windows elasticsearch-7.16.1-windows-x86_64.zip

      elasticsearch(elasticsearch-7.16.2-amd64.deb适用于deb x86_64)是一个高度可扩展的开源全文本搜索和分析引擎。它使您可以快速,近乎实时地存储,搜索和分析大量数据。它通常用作支持具有复杂搜索功能和要求的...

      elasticsearch(elasticsearch-7.16.2-darwin-aarch64.tar.gz适用于macos arm系统)是一个高度可扩展的开源全文本搜索和分析引擎。它使您可以快速,近乎实时地存储,搜索和分析大量数据。它通常用作支持具有复杂搜索...

      最新版windows elasticsearch-7.16.0-windows-x86_64.zip

      最新版linux elasticsearch-7.16.0-linux-x86_64.tar.gz

    global site tag (gtag.js) - google analytics
    网站地图