快速查询备案的命令行小工具

你是否遇到过这样的问题,登录工信部网站查询备案,网站一直提示验证码错误,明明验证码是对的,却一直还提示错误。

看看网友的吐槽吧!

至少我很少能在 Chrome、火狐这样的浏览器查询成功,验证码一直错, IE ?IE 对于我来说是用来下载其他浏览器的工具。

此外,查询时需要打开浏览器—找到工信部站点—-输入域名—输入验证码-查询。是不是步骤有点多?在加上个验证码一直出错。我已经恨透了这个网站了。

但是这样的需求又很频繁,因为工作需要,会经常对客户的域名进行检查,并且我习惯于在命令行下工作,命令行对于我来说可以快速提升效率。于是自己写个脚本去快速实时的获取备案信息。

最开始的时候我在网上找了带缓存的 API 接口,查询结果不是实时的,不太准确,有的备案信息已经下掉了还是能查到,有的已经备案了却又查不到,于是,付费买了个接口,如果你也有类似需求,可以去类似于聚合数据、NowAPI 这样的专门做数据的网站去购买接口实现,比如聚合数据是99块钱2000次,NowAPI 是0.015一次请求。查询次数不是很多或者你恰好也这块需求的不妨试试。

下面是我写的代码,基于 Python3实现。

更新内容

2017-11-1

1.加了redis 做了一层缓存,缓存7天,因为是付费接口,这样可以省点钱。

2017-10-31

1.增加了查询 IP 地址归属的功能。
2.增加了更新程序的功能,使用 -u 参数可查找是否有更新。
3.支持查询 IP 地址归属地。

2017-10-30

1.支持带 url 的链接自动解析主域名。
2.支持 Windows 、Linux 、mac系统。
3.基于实时的备案数据库,与工信部查询结果完全一致。

下载地址

使用方法

1.将下载的文件重命名未 ba,Windows 命名为 ba.exe

2.mac 和 Linux 将文件拷贝到 /usr/local/bin 下,并赋予其可执行权限

chmod +x /usr/local/bin/ba

Windows 系统将文件拷贝到 C:\Windows 目录下

使用

1.查询域名

 ➜  ~ ba -d www.youku.com                  
正在查询域名:youku.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>[100%]

===================================
该域名备案状态:已备案
备案号:京ICP备06050721号-1
备案类型:企业
备案单位:合一信息技术(北京)有限公司
备案时间:2017-08-29 00:00:00
===================================

2.查询带 URL的域名

 ➜  ~ ba -d https://awen.me/post/48427.html
正在查询域名:awen.me
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>[100%]

===================================
该域名备案状态:已备案
备案号:浙ICP备15018780号-3
备案类型:个人
备案单位:方文俊
备案时间:2016-12-14 00:00:00
===================================

3.查询 IP 地址

 ➜   ~ ba

        这是一个提升工作效率的小工具

usage: ba [-h] [-d DOMAIN] [-v] [-i IP] [-a] [-u]

optional arguments:
  -h, --help            show this help message and exit
  -d DOMAIN, --domain DOMAIN
                        指定需要查询的域名,例如: ba -d awen.me
  -v, --version         显示版本信息
  -i IP, --ip IP        查询 IP 归属地,例如 ba -i 123.234.1.1
  -a, --auth            显示作者信息
  -u, --update          升级程序
?  ~ ba -i 121.42.148.64
正在查询 IP 归属:121.42.148.64
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>[100%]

===================================
所在国家:中国
所在省份:山东
所在市区:青岛
所在县区:
所属运营商:阿里云/电信/联通/移动/铁通/教育网
===================================

4.更新程序

➜  ~ sudo ba -u
没有可用更新

代码

  #!/usr/bin/python3
# -*- coding: UTF-8 -*-
import time
import re
import platform
import os,sys, stat
import ast
import json

try:
    import requests
    import argparse
    import tldextract
    import redis
    from termcolor import *
except Exception as e:
    print("请先安装 requests argparse tldextract redis")

# 程序版本

VERSION = "1.3"

# debug 开关

sys.stderr = None

# 判断 IP 地址
def isIP(str):
    p = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
    if p.match(str):
        return True
    else:
        return False

# 查询 IP 地址
def query_ip(ip):
    api = "http://freeapi.ipip.net/"+ip
    try:
        headers = {"User-Agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36','cache-control':'no-cache'}
        r = requests.get(api,headers=headers)
    except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError):
        print("请求异常,无法连接服务器")
    except requests.HTTPError as f:
        print('请求异常,HTTP错误')
    finally:
        r.encoding = 'UTF-8'
        r = eval(r.text)
        print("所在国家:" + r[0])
        print("所在省份:" + r[1])
        print("所在市区:" + r[2])
        print("所在县区:" + r[3])
        print("所属运营商:" + r[4])

# 查询域名备案,先查缓存,如果没有,则去线上查,然后返回
def req(domain):

    redis_connect = redis.Redis(host='', port=, db=0, password='')
    get_value = redis_connect.get(domain)
    if get_value == None:
        api = "https://api.awen.me"
        payload = {"app":"domain.beian","domain":domain,"appkey":"","sign":"","format":"json"}
        headers = {"User-Agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36','cache-control':'no-cache'}
        try:
            r = requests.get(api,headers=headers,params=payload)
        except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError):
            print("请求异常,无法连接服务器")
        except requests.HTTPError as f:
            print('请求异常,HTTP错误')
        finally:
            info = r.json()
            info = str(info)
            if r.status_code == 200:
                redis_connect.set(domain,info)
                redis_connect.expire(domain,604800)
                get_value = redis_connect.get(domain)
                get_value = get_value.decode('utf-8')
                info = ast.literal_eval(get_value)
                if info["result"]["status"] == 'NOT_BEIAN':
                    print("该域名备案状态:"+colored("未备案","red"))
                elif info["result"]["status"] == 'ALREADY_BEIAN':
                    print("该域名备案状态:"+colored("已备案","green"))
                    print("备案号:"+info["result"]["icpno"])
                    print("备案类型:" + info["result"]["organizers_type"])
                    print("备案单位:" + info["result"]["organizers"])
                    print("备案时间:" + info["result"]["exadate"])
                elif info["result"]["status"] == "WAIT_PROCESS":
                    print("等待系统处理,预计10分钟内可完成")
                elif info["success"] == "0":
                    print("系统错误")
            else:
                print("服务器异常")
    else:
        get_value = redis_connect.get(domain)
        get_value = redis_connect.get(domain)
        get_value = get_value.decode('utf-8')
        info = ast.literal_eval(get_value)
        if info["result"]["status"] == 'NOT_BEIAN':
            print("该域名备案状态:"+colored("未备案","red"))
        elif info["result"]["status"] == 'ALREADY_BEIAN':
            print("该域名备案状态:"+colored("已备案","green"))
            print("备案号:"+info["result"]["icpno"])
            print("备案类型:" + info["result"]["organizers_type"])
            print("备案单位:" + info["result"]["organizers"])
            print("备案时间:" + info["result"]["exadate"])
        elif info["result"]["status"] == "WAIT_PROCESS":
            print("等待系统处理,预计10分钟内可完成")
        elif info["success"] == "0":
            print("系统错误")

def update(version):
    oldver = version
    newver = round((float(oldver)+0.1),2)
    osinfo = platform.system()
    version = str(newver)
    mac_url = ''+version
    linux_url = ''+version
    win_url = ''+version+'.exe'
    headers = {"User-Agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36','cache-control':'no-cache','referer':'awen.me'}
    if osinfo == "Darwin":
        try:
            r = requests.get(mac_url,verify=False,stream=True,headers=headers)
        except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError):
            print("请求异常,无法连接服务器")
        except requests.HTTPError as f:
            print('请求异常,HTTP错误')
        finally:
            if r.status_code == 200:
                os.remove('/usr/local/bin/ba')
                print(colored("正在下载更新","yellow"))
                plan()
                data = r.content
                with open('/usr/local/bin/ba', 'wb') as f:
                    f.write(data)
                print("正在设置更新")
                os.popen('chmod 755 /usr/local/bin/ba')
                print("更新完成!")
            else:
                print("没有可用更新")
    if osinfo == "Linux":
        try:
            r = requests.get(linux_url,verify=False,headers=headers)
        except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError):
            print("请求异常,无法连接服务器")
        except requests.HTTPError as f:
            print('请求异常,HTTP错误')
        finally:
            if os.geteuid() == 0:
                if r.status_code == 200:

                    os.remove('/usr/local/bin/ba')
                    print(colored("正在下载更新","yellow"))
                    plan()
                    data = r.content
                    with open('/usr/local/bin/ba', 'wb') as f:
                        f.write(data)
                    print("正在设置更新")
                    os.system('chmod 755 /usr/local/bin/ba')
                    print("更新完成!")
                else:
                    print("没有可用更新")
            else:
                print("请使用 Root 权限执行")

    if osinfo == "Windows":
        try:
            r = requests.get(win_url,verify=False,stream=True,headers=headers)
        except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError):
            print("请求异常,无法连接服务器")
        except requests.HTTPError as f:
            print('请求异常,HTTP错误')
        finally:
            if r.status_code == 200:
                print(colored("正在下载更新","yellow"))
                plan()
                data = r.content
                with open('C:/Windows/ba.exe', 'wb') as f:
                    f.write(data)
                print("正在设置更新")
                print("更新完成!")
            else:
                print("没有可用更新")


# 进度条
def plan():
    for i in range(100):
        k = i + 1
        str = '>'*(i//2)+' '*((100-k)//2)
        sys.stdout.write('\r'+str+'[%s%%]'%(i+1))
        sys.stdout.flush()
        time.sleep(0.002)
    print("\n")

# 菜单
def menu():

    if len(sys.argv) == 1:
        sys.argv.append('--help')
        APP_DESC = """
        这是一个提升工作效率的小工具
        """
        print(APP_DESC)
    parser = argparse.ArgumentParser()
    parser.add_argument('-d','--domain',dest="domain",help="指定需要查询的域名,例如: ba -d awen.me")
    parser.add_argument('-v','--version',dest="version",help="显示版本信息",action="store_true")
    parser.add_argument('-i','--ip',dest="ip",help="查询 IP 归属地,例如 ba -i 123.234.1.1")
    parser.add_argument('-a','--auth',dest="auth",help="显示作者信息",action="store_true")
    parser.add_argument('-u','--update',dest="update",help="升级程序",action="store_true")

    args = parser.parse_args()

    if args.domain:
        try:
            url =  tldextract.extract(args.domain)
        except Exception as e:
            pass
        domain = url.domain+"."+url.suffix
        print("正在查询域名:"+colored(domain,"yellow"))
        plan()
        print("===================================")
        req(domain)
        print("===================================")
    if args.version:
        print("当前版本:"+VERSION)

    if args.auth:
        print("Auth:awen E-mail:[email protected]")
        print("E-mail:[email protected]")
        print("Blog:https://awen.me")
    if args.ip:
        if isIP(args.ip):
            print("正在查询 IP 归属:"+colored(args.ip,"yellow"))
            plan()
            print("===================================")
            query_ip(args.ip)
            print("===================================")
        else:
            print(colored("您输入的不是一个正确的 IP 地址!!","red"))
    if args.update:
        args.update = VERSION
        update(args.update)

if __name__=="__main__":
    menu()