#!/usr/bin/python3 # -*-coding:utf-8-*- import hashlib import hmac import base64 import datetime import requests import json import urllib.parse
def b64(s): return base64.b64encode(s.encode()).decode()
def httpdate_rfc1123(dt=None): dt = dt or datetime.datetime.utcnow() return datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
def sign(username, password, method, uri, date): signarr = [method, uri, date] signstr = '&'.join(signarr) signstr = base64.b64encode( hmac.new(password.encode(), signstr.encode(), digestmod=hashlib.sha1).digest() ).decode() return 'UPYUN %s:%s' % (username, signstr)
bucket = "file201503" operator = "admin" password = "123456" password_md5 = hashlib.md5(password.encode()).hexdigest()
headers = {} headers['Date'] = httpdate_rfc1123() headers['Authorization'] = sign(operator, password_md5, 'POST', '/pretreatment/', headers['Date']) headers['Content-Type'] = 'application/x-www-form-urlencoded'
def main(): tasks = { "type": "video", "avopts": "/s/240p(4:3)/as/1/r/30", "return_info": True, "save_as": "/a/b.mp4" } # print(tasks) # {'type': 'video', 'save_as': '/a/b.mp4', 'return_info': True, 'avopts': '/s/240p(4:3)/as/1/r/30'} data = { 'source': '/BBB/test.mp4', 'service': bucket, 'notify_url': 'http://lacewing.cc:8090/echo', 'accept': 'json', 'tasks': base64.b64encode(json.dumps([tasks]).encode()).decode() } # print(data) # {'tasks': 'W3sidHlwZSI6ICJ2aWRlbyIsICJhdm9wdHMiOiAiL3MvMjQwcCg0OjMpL2FzLzEvci8zMCIsICJyZXR1cm5faW5mbyI6IHRydWUsICJzYXZlX2FzIjogIi9hL2IubXA0In1d', 'source': '/BBB/test.mp4', 'accept': 'json', 'service': 'file201503', 'notify_url': 'http://lacewing.cc:8090/echo'} body = urllib.parse.urlencode(data) # print(body) # 得到的结果如下accept=json&source=%2FBBB%2Ftest.mp4¬ify_url=http%3A%2F%2Flacewing.cc%3A8090%2Fecho&tasks=W3sicmV0dXJuX2luZm8iOiB0cnVlLCAidHlwZSI6ICJ2aWRlbyIsICJzYXZlX2FzIjogIi9hL2IubXA0IiwgImF2b3B0cyI6ICIvcy8yNDBwKDQ6MykvYXMvMS9yLzMwIn1d&service=file201503 response = requests.request("POST", "http://p0.api.upyun.com/pretreatment/", data=body, headers=headers) print(response.text)
if __name__ == '__main__': main()
|