import time import re import platform import os,sys, stat import ast import json
try: import requests import argparse import tldextract import redis from termcolor import * except Exception as e: print("请先安装 requests argparse tldextract redis")
VERSION = "1.3"
sys.stderr = None
def isIP(str): p = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$') if p.match(str): return True else: return False
def query_ip(ip): api = "http://freeapi.ipip.net/"+ip try: headers = {"User-Agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36','cache-control':'no-cache'} r = requests.get(api,headers=headers) except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError): print("请求异常,无法连接服务器") except requests.HTTPError as f: print('请求异常,HTTP错误') finally: r.encoding = 'UTF-8' r = eval(r.text) print("所在国家:" + r[0]) print("所在省份:" + r[1]) print("所在市区:" + r[2]) print("所在县区:" + r[3]) print("所属运营商:" + r[4])
def req(domain):
redis_connect = redis.Redis(host='', port=, db=0, password='') get_value = redis_connect.get(domain) if get_value == None: api = "https://api.awen.me" payload = {"app":"domain.beian","domain":domain,"appkey":"","sign":"","format":"json"} headers = {"User-Agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36','cache-control':'no-cache'} try: r = requests.get(api,headers=headers,params=payload) except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError): print("请求异常,无法连接服务器") except requests.HTTPError as f: print('请求异常,HTTP错误') finally: info = r.json() info = str(info) if r.status_code == 200: redis_connect.set(domain,info) redis_connect.expire(domain,604800) get_value = redis_connect.get(domain) get_value = get_value.decode('utf-8') info = ast.literal_eval(get_value) if info["result"]["status"] == 'NOT_BEIAN': print("该域名备案状态:"+colored("未备案","red")) elif info["result"]["status"] == 'ALREADY_BEIAN': print("该域名备案状态:"+colored("已备案","green")) print("备案号:"+info["result"]["icpno"]) print("备案类型:" + info["result"]["organizers_type"]) print("备案单位:" + info["result"]["organizers"]) print("备案时间:" + info["result"]["exadate"]) elif info["result"]["status"] == "WAIT_PROCESS": print("等待系统处理,预计10分钟内可完成") elif info["success"] == "0": print("系统错误") else: print("服务器异常") else: get_value = redis_connect.get(domain) get_value = redis_connect.get(domain) get_value = get_value.decode('utf-8') info = ast.literal_eval(get_value) if info["result"]["status"] == 'NOT_BEIAN': print("该域名备案状态:"+colored("未备案","red")) elif info["result"]["status"] == 'ALREADY_BEIAN': print("该域名备案状态:"+colored("已备案","green")) print("备案号:"+info["result"]["icpno"]) print("备案类型:" + info["result"]["organizers_type"]) print("备案单位:" + info["result"]["organizers"]) print("备案时间:" + info["result"]["exadate"]) elif info["result"]["status"] == "WAIT_PROCESS": print("等待系统处理,预计10分钟内可完成") elif info["success"] == "0": print("系统错误")
def update(version): oldver = version newver = round((float(oldver)+0.1),2) osinfo = platform.system() version = str(newver) mac_url = ''+version linux_url = ''+version win_url = ''+version+'.exe' headers = {"User-Agent":'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36','cache-control':'no-cache','referer':'awen.me'} if osinfo == "Darwin": try: r = requests.get(mac_url,verify=False,stream=True,headers=headers) except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError): print("请求异常,无法连接服务器") except requests.HTTPError as f: print('请求异常,HTTP错误') finally: if r.status_code == 200: os.remove('/usr/local/bin/ba') print(colored("正在下载更新","yellow")) plan() data = r.content with open('/usr/local/bin/ba', 'wb') as f: f.write(data) print("正在设置更新") os.popen('chmod 755 /usr/local/bin/ba') print("更新完成!") else: print("没有可用更新") if osinfo == "Linux": try: r = requests.get(linux_url,verify=False,headers=headers) except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError): print("请求异常,无法连接服务器") except requests.HTTPError as f: print('请求异常,HTTP错误') finally: if os.geteuid() == 0: if r.status_code == 200:
os.remove('/usr/local/bin/ba') print(colored("正在下载更新","yellow")) plan() data = r.content with open('/usr/local/bin/ba', 'wb') as f: f.write(data) print("正在设置更新") os.system('chmod 755 /usr/local/bin/ba') print("更新完成!") else: print("没有可用更新") else: print("请使用 Root 权限执行")
if osinfo == "Windows": try: r = requests.get(win_url,verify=False,stream=True,headers=headers) except (requests.ConnectionError, IndexError, UnicodeEncodeError, TimeoutError): print("请求异常,无法连接服务器") except requests.HTTPError as f: print('请求异常,HTTP错误') finally: if r.status_code == 200: print(colored("正在下载更新","yellow")) plan() data = r.content with open('C:/Windows/ba.exe', 'wb') as f: f.write(data) print("正在设置更新") print("更新完成!") else: print("没有可用更新")
def plan(): for i in range(100): k = i + 1 str = '>'*(i//2)+' '*((100-k)//2) sys.stdout.write('\r'+str+'[%s%%]'%(i+1)) sys.stdout.flush() time.sleep(0.002) print("\n")
def menu():
if len(sys.argv) == 1: sys.argv.append('--help') APP_DESC = """ 这是一个提升工作效率的小工具 """ print(APP_DESC) parser = argparse.ArgumentParser() parser.add_argument('-d','--domain',dest="domain",help="指定需要查询的域名,例如: ba -d awen.me") parser.add_argument('-v','--version',dest="version",help="显示版本信息",action="store_true") parser.add_argument('-i','--ip',dest="ip",help="查询 IP 归属地,例如 ba -i 123.234.1.1") parser.add_argument('-a','--auth',dest="auth",help="显示作者信息",action="store_true") parser.add_argument('-u','--update',dest="update",help="升级程序",action="store_true")
args = parser.parse_args()
if args.domain: try: url = tldextract.extract(args.domain) except Exception as e: pass domain = url.domain+"."+url.suffix print("正在查询域名:"+colored(domain,"yellow")) plan() print("===================================") req(domain) print("===================================") if args.version: print("当前版本:"+VERSION)
if args.auth: print("Auth:awen E-mail:hi@awen.me") print("E-mail:hi@awen.me") print("Blog:https://awen.me") if args.ip: if isIP(args.ip): print("正在查询 IP 归属:"+colored(args.ip,"yellow")) plan() print("===================================") query_ip(args.ip) print("===================================") else: print(colored("您输入的不是一个正确的 IP 地址!!","red")) if args.update: args.update = VERSION update(args.update)
if __name__=="__main__": menu()
|