Python3 ElasticSearch 查询大于 10000 条数据 并发送至邮箱

Python3   2023-01-19 17:17   922   0  

场景:

                运营需要查询近15天和近30天的订单信息并定期发送至,数据在kibana里,所以需要从elasticsearch5里查询,于是我用python3开始搞来搞去,15天没问题,30天出问题了,订单数量大于10000条,返回报错,于是开始百度。

                百度上的回答全是复制粘贴,真特么坑。。。

                于是我发现elasticseatch5最大只能返回10000条数据,大于10000条数据就要改参数。

                例如:

# 修改索引查询最大只能返回10000条数据的限制, 此时修改为300万。
PUT grade/_settings
{
  "index.max_result_window": 3000000
}

                又例如:

# 在创建索引的时候加上
{
  "settings": {
    "index": {
      "max_result_window": 10000000000
    }
  }
}

                但这些并不适合我,因为elasticsearch我不会用,我小白运维,不想改参数,在创建索引时加参数有不适合我,还有加一些奇奇怪怪的参数的,也都不好使,于是我就请教了公司的大佬;

                大佬说:

                                elasticsearch里有自增id,也有创建时间,可以先指定时间范围然后分页查询500条,id倒叙,取第一个id,这个id是结束ID;

                                PS:这里可以判断上一句话里的查询结果,如果小于500条,就代表一次性取完了,就可以return了。

                                然后指定时间范围分页查询500条,id正叙,取第一个id,这个id是开始ID;

                                得到这两个id,就可以进行id范围查询,每次查询完成加起始id加500,然后继续查询,直到返回结果小于500条。                       

上才艺

数据已脱敏,可能会看起来怪怪的

main.py

# coding: utf-8
import csv,sys,pymysql,copy,os,time
from elasticsearch5 import Elasticsearch
from pymysql.cursors import DictCursor
from pub import sendEmail,mail_HTML,shell_ini,time_ini,es_sql,es_ini


base_dir = sys.path[0] + "/data/"
es = Elasticsearch(         # es连接设置
    ['********'],
    http_auth=('********', '********'),
    port=9200,
    use_ssl=False
)
result = es.search()


def main(index=None,keys=None,filename=None,day=1,title=None,wheres={},other=False):    # 主函数 各种操作的集合地
    if not index:return("Please input index!")
    if not keys:return("Please input keys!")
    if not filename:return("Please input filename!")
    print(filename)
    days = time_ini(date=day)                                                           # 获取日期用于创建目录
    dir = base_dir + days["start_day"] + "_" + days["stop_day"] + "/"                   # 带日期的绝对路径
    os.makedirs(dir, mode=493, exist_ok=True)                               # 创建目录 mode 权限 八进制 493=755 511=777
    time_data = time_ini(day=day)                                           # 获取毫秒级时间戳
    sql_data = es_sql(keys=keys,time=time_data,wheres=wheres)               # 获取处理好的es语句
    resp = es_ini(type="search",index=index,connect=es,data=sql_data)       # 获取es查询到的数据
    new_title = copy.deepcopy(keys)                                 # 从keys深拷贝数据做csv标题
    if title:                                                       # 判断是否手动添加title
        for i in title:                                             # 循环添加title
            new_title.append(i)                                     # 判断是否指定csv标题
    csv_filename = dir + filename                                   # csv文件的绝对路径 用于判断是否有数据
    file_csv = open(csv_filename, 'a', encoding='utf_8_sig')        # 打开一个文件
    csv_writer = csv.writer(file_csv)                               # 实例化csv
    if not os.path.getsize(csv_filename):                           # 判断文件是否有数据 有的话不用再次写入标题
        csv_writer.writerow(new_title)                              # 写入csv标题
    for i in resp:                                                  # 开始循环数据
        tmp_list = []                                               # csv直接收一个列表参数
        for k in keys:                                              # 开始循环key
            k = i["_source"].get(k)                                 # 从数据中得到单个key
            tmp_list.append(k)                                      # 将单个key写入到临时列表
        if index == "****":                          # 特殊操作 微调 用于添加字段
            where_name = wheres["match_phrase"]["tablename"]["query"]
            if  where_name == "****":
                tmp_list.append(2)
            elif where_name == "****":
                tmp_list.append(1)
        # print(index,tmp_list[0])
        csv_writer.writerow(tmp_list)                               # 循环keys结束后 得到全部到key 开始写入csv文件 writerow 只接收受一个列表
    print("==========")
    file_csv.close()                                                # 关闭csv 一个py文件中多次操作csv 可能导致文件写入无数据 怀疑是文件句柄的原因



def ****(day=1):            # 单个表的操作
    keys = [
        'id',
        '****',
        '****',
    ]

    main(index="****",keys=keys,filename="****.csv",day=day)



def ****(day=1):            # 单个表的操作
    keys = [
        'id',
        '****',
        '****',
    ]
    main(index="****",keys=keys,filename="****.csv",day=day)



def ****(day=1):            # 单个表的操作
    keys = [
        'id',
        '****',
        '****',
    ]
    wheres = {  # 短句匹配
        "match_phrase": {
            "tablename": {
                "query": "****"
            }
        }           
    }

    main(index="****",keys=keys,filename="****.csv",day=day,wheres=wheres,title=["src"])



def ****(day=1):            # 单个表的操作
    keys = [
        'id',
        '****',
        '****',
    ]
    wheres = {  # 短句匹配
        "match_phrase": {
            "tablename": {
                "query": "****"
            }
        }           
    }
    main(index="****",keys=keys,filename="****.csv",day=day,wheres=wheres,title=["src"])



def ****(day=1):            # 单个表的操作
    keys = [
        'id',
        '****',
        '****',
    ]
    main(index="****",keys=keys,filename="****.csv",day=day)



def zip_ini(day=1):                                                                     # 压缩文件
    days = time_ini(date=day)
    day_dir = days["start_day"] + "_" + days["stop_day"]
    yum_command = "yum -y install zip unzip"                                            # 压缩命令
    if not shell_ini(yum_command).get(0): print("ZIP Package Installation Failed.")     # 安装zip包
    zip_command = "cd {} && zip -Dr {}.zip ./{}".format(base_dir,day_dir,day_dir)
    zip_com = shell_ini(zip_command).get(0)                                             # get(0) 如果有0的key为真 否则返回None
    if zip_com:
        print("压缩成功.")
        zip_dir = base_dir + day_dir
        ls_command = "ls {}".format(zip_dir)
        ls_com = shell_ini(ls_command).get(0)
        if ls_com:
            resp_data = {}
            for i in ls_com:
                file_dir = zip_dir + "/" + i 
                file_count = int(len(open(file_dir,'r').readlines())) - 1               # 统计文件行数 -1去掉标题
                resp_data[i] = file_count
            return({base_dir + day_dir + ".zip":resp_data})                             # 返回字典 {绝对路径的文件名:{表名:行数}}
        else:
            return("error of if zip.")
    else:
        print("压缩失败.")
        tmp_str = ""
        for i in shell_ini(zip_command).values():
            for ii in i:tmp_str += ii  
        print(tmp_str)
        return(None)



def sendmail(day=1):
    ****(day=day)
    ****(day=day)
    ****(day=day)
    ****(day=day)
    ****(day=day)
    ret = zip_ini(day)
    user_mail_list = ["运营的邮箱@xxxx.com"]
    for k,v in ret.items():
        sendEmail(user_mail_list,"title",htmlText=mail_HTML(data=v),appenfile=k)



if __name__ == '__main__':
    start_time = int(time.time())
    try:
        if len(sys.argv) == 2:
            day = sys.argv[1]
            print("day=",day)
            sendmail(day=int(day))
            print("Mail Sent.")
        else:
            print("Unknown Operation.")
    except:
        print("Unknown Error.")
    end_time = int(time.time())
    print("Time: ",end_time - start_time)

pub.py

# encoding:utf-8
import time
import datetime
import subprocess
import os
from datetime import timedelta



def time_ini(day=None,date=None):     # 获取毫秒级时间戳  返回字典 开始时间和结束时间
    if day:
        fn = day
    elif date:
        fn = date
    new_str_date = datetime.date.today()
    old_str_date = new_str_date - timedelta(days=fn)
    if date:return({"start_day":str(old_str_date),"stop_day":str(new_str_date)})
    new_unix_date = round(time.mktime(new_str_date.timetuple()) * 1000)
    old_unix_date = round(time.mktime(old_str_date.timetuple()) * 1000)
    return({"start_time":old_unix_date,"stop_time":new_unix_date})


def es_ini(type=None,index=None,connect=None,data=None):    # es 查询操作 返还原始数据
    """
    type = string("search")
    data = {
        "query":{                           # 查询语句
            "range":{                       # 范围查询    
                "created_at":{              # 指定字段
                    "gte":1672545600000,    # gte 大于或等于;gt 大于
                    "lte":1672549999999,    # lte 小于或等于;lt 小于
                    "format":"epoch_millis" # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳
                }
            }
        }
    }
    index = index
    connect = elasticsearch instantiation
    """
    if not connect:return("Please input elasticsearch instantiation!")
    if type == "search":
        ret_list = []                                       # return 返回列表
        data.update({"sort":[{"id":{"order":"desc"}}]})     # 更改es语句用于获取 结束id
        result = connect.search(index=index,body=data)      # 倒序返回数据500条
        result_list = result["hits"]["hits"]                # 获取 数据列表
        end_id = int(result_list[0]["sort"][0])             # 获取 结束id
        if len(result_list) < 500:return(result_list)       # 判断返回数据条数 如果小于500 表示只有这些 直接return
        data.update({"sort":[{"id":{"order":"asc"}}]})      # 更改es语句用于获取 开始id
        result = connect.search(index=index,body=data)      # 正序返回数据500条
        ret_list += result["hits"]["hits"]                  # 获取 数据列表
        for i in range(1,40):                               # 最多返回2万条数据
            print("当前条数",i*500,"+") 
            start_id = int(ret_list[-1]["sort"][0])         # return 列表存的数据是正序 获取最后一条数据的id作为start_id
            es_range = {                                    # 更改查询es 从 时间范围查询 变为 id 范围查询
                "range":{                                   # 范围查询    
                    "id":{                                  # 指定字段
                        "gte":start_id + 1,                 # 用了大于等于 所以需要+1 防止重复;gte 大于或等于;gt 大于
                        "lte":end_id,                       # lte 小于或等于;lt 小于
                        "format":"epoch_millis"             # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳
                    }
                }
            }
            data["query"]["bool"]["must"][-1] = es_range    # 更改es range查询条件
            result = connect.search(index=index,body=data)  
            result_list = result["hits"]["hits"]
            ret_list += result_list
            if len(result_list) < 500:
                print("总条数",len(ret_list))
                return(ret_list)                            # 返回列表
        return(None)                                        # for 循环出错
    else:
        return("Unknown operation!")



def shell_ini(command=None):
    if not command:
        return("Please input command.")
    res = subprocess.run(command, 
                        shell=True, 
                        stdout=subprocess.PIPE, 
                        stderr=subprocess.PIPE, 
                        encoding='utf-8',
                        timeout=300,
                        executable='/bin/bash')
    tmp_list = []
    for i in res.stdout.replace("\n",",").strip(",").split(","):
        tmp_list.append(i.strip("\t"))
    return({res.returncode:tmp_list})



def sendEmail(user_list=["默认邮箱"],subject=None,htmlText=None,plainText=None,appenfile=None):
    import smtplib
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    strTo = ','.join(user_list)
    server = "smtp.exmail.qq.com"  #设置服务器
    user = "****"    #用户名
    passwd = "****"   #口令
    strFrom = '****'
    # 设定root信息
    msgRoot = MIMEMultipart('related')
    msgRoot['Subject'] = subject
    msgRoot['From'] = strFrom
    msgRoot['To'] = strTo
    msgAlternative = MIMEMultipart('alternative')
    msgRoot.attach(msgAlternative)
    #设定纯文本信息
    if plainText:
        msgText = MIMEText(plainText, 'plain', 'utf-8')
        msgAlternative.attach(msgText)
    #设定HTML信息
    if htmlText:
        msgText = MIMEText(htmlText, 'html', 'utf-8')
        msgAlternative.attach(msgText)
    #添加附件
    if appenfile:
        filename = os.path.split(appenfile)
        att1 = MIMEText(open(appenfile, 'rb').read(), 'base64', 'utf-8')
        att1["Content-Type"] = 'application/octet-stream'
        # 这里的filename可以任意写,写什么名字,邮件中显示什么名字
        att1["Content-Disposition"] = 'attachment; filename="{}"'.format(filename[-1])
        msgAlternative.attach(att1)
    #发送邮件
    smtp = smtplib.SMTP()
    smtp.connect(server)
    smtp.login(user, passwd)
    smtp.sendmail(strFrom,user_list,msgRoot.as_string())
    smtp.quit()



def mail_HTML(data):
    html = """
    <table border="1">
        <tr>
            <th>表名称</th>
            <th>数量</th>
            <th>描述</th>
        </tr>
    """
    for k,v in data.items():
        msg = """
            <tr>
                <td>{tablename}</td>
                <td>{count}</td>
                <td>{desc}</td>
            </tr>
        """
        c = ""
        if k == "****.csv":
            c = "****"
        elif k == "****.csv":
            c = "****"
        elif k == "****.csv":
            c = "****"
        elif k == "****.csv":
            c = "****"
        msg = msg.format(
            tablename = k,
            count = v,
            desc = c,
        )
        html += msg
    html += "</table>"
    return html



def es_sql(keys=None,time=None,wheres={}):  # 查询语句 传入需要查询的key和毫秒级时间
    if not keys:
        return("Please input keys.")
    elif not time:
        return("Plesae input time")
    else:
        data = {
            "version":True,                 # 显示版本
            "from":0,                       # 分页
            "size":500,                     # 分页
            "sort":[                        # 排序
                {
                    "id":{                  # 默认 desc 递减;asc 递增
                        "order":"asc"
                    }
                }
            ],
            "query":{                       # 查询语句
                "bool":{                    # 布尔查询
                    "must":[                # 布尔查询下的文档匹配查询;返回的文档必须满足 must 子句的条件,并且参与计算分值
                        {
                            "match_all":{   # 匹配所有文档,给它们的_score都是1.0;_score可以通过boost参数改变:

                            }
                        },
                        wheres
                        ,
                        {
                            "range":{                       # 范围查询    
                                "created_at":{              # 指定字段
                                    "gte":time["start_time"],    # gte 大于或等于;gt 大于
                                    "lte":time["stop_time"],     # lte 小于或等于;lt 小于
                                    "format":"epoch_millis" # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳
                                }
                            }
                        }
                    ],
                    "must_not":[                    # 布尔查询下的文档匹配查询;不会计算分数,并且会使用缓存。

                    ]
                }
            },
            "_source":{                             # 默认 includes/excludes 只显示/不显示;当你不需要看到整个文档时,单个字段可以从 _source 字段提取和通过 get 或者 search 请求返回
                "includes":keys,
                "excludes":[
                ]
            },
            # "aggs":{                                # 聚合查询
            #     "uid":{                             # 聚合名字 任意
            #         "terms":{                       # 度量运算的方式(划分桶的方式)
            #             "field":"id",               # 度量运算的字段(划分桶的字段)
            #             "min_doc_count":1,          # 过滤文档数量小于1的
            #         },
            #         "aggs":{
            #             "last_opentime":{
            #                 "max":{
            #                     "field": "last_opentime"
            #                 }
            #             }
            #         }
            #     }
            # },
            "stored_fields":[           # 用于存储没有经过任何分析(without any analysis)的字段值,以实现在查询时能得到这些原始值.
                "*"
            ],
            "script_fields":{           # 基于不同的字段定制一些属性

            },
            # "docvalue_fields":[         # 将文档字段以文档值机制保存的值返回;文档值机制是非text类型字段支持的一种在硬盘中保存字段原始值的机制
            #     "created_at",
            #     "last_opentime",
            #     "updated_at"
            # ],
        }
        return(data)

总结

elasticsearch 真好玩 我是真不会 太难了。

写了我一个礼拜,事已至此,实现了15天和30天查询数据并发送邮件到运营邮箱了,可以交工了。


博客评论
还没有人评论,赶紧抢个沙发~
发表评论
说明:请文明发言,共建和谐网络,您的个人信息不会被公开显示。