Spaces:
Running
Running
import time ,json | |
from datetime import datetime,timedelta | |
import requests ,re | |
import gradio as gr | |
def request_first(authorization): | |
headers = { | |
"Host": "park.biz.baoneng.com", | |
"Connection": "keep-alive", | |
"Accept": "application/json", | |
"GmToken": "9", | |
"xweb_xhr": "1", | |
"Authorization": authorization, | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309092b) XWEB/9129", | |
"Content-Type": "application/json", | |
"Sec-Fetch-Site": "cross-site", | |
"Sec-Fetch-Mode": "cors", | |
"Sec-Fetch-Dest": "empty", | |
"Referer": "https://servicewechat.com/wxda6d35e0951cf46d/64/page-frame.html", | |
"Accept-Encoding": "gzip, deflate, br", | |
"Accept-Language": "zh-CN,zh;q=0.9" | |
} | |
url = "https://park.biz.baoneng.com/shop/v1/parking/info?car_no=%E7%B2%A4BBQ9667&type=car&gm_id=9" | |
response = requests.get(url, headers=headers) | |
data = response.json() | |
# print(data) | |
if data['code']==0: | |
user_id = data['result']['card']['user_id'] | |
order = data['result']['orders'] | |
begin = order['begin'] | |
fee = order['fee'] | |
duration = order['stay_duration'] | |
third_no = order['third_no'] #第二次请求参数使用 | |
return third_no,fee,begin,duration,user_id | |
else: | |
return None | |
def request_second(authorization,third_no,fee,begin): | |
url = "https://park.biz.baoneng.com/shop/v1/parking/pay" | |
headers = { | |
"Host": "park.biz.baoneng.com", | |
"Connection": "keep-alive", | |
# "Content-Length": "318", # Adjust based on actual data length | |
"Accept": "application/json", | |
"GmToken": "9", | |
"xweb_xhr": "1", | |
"Authorization": authorization, | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a1b) XWEB/9129", | |
"Content-Type": "application/json", | |
"Sec-Fetch-Site": "cross-site", | |
"Sec-Fetch-Mode": "cors", | |
"Sec-Fetch-Dest": "empty", | |
"Referer": "https://servicewechat.com/wxda6d35e0951cf46d/64/page-frame.html", | |
"Accept-Encoding": "gzip, deflate, br", | |
"Accept-Language": "zh-CN,zh;q=0.9" | |
} | |
data = {"code":"0c3Cgx0w30UKV23Hr10w3HLZER3Cgx0M","car_no":"粤BBQ9667","card_no":"粤BBQ9667","entry_time":begin,"third_pay":True,"third_no":third_no,"fee":fee,"discount_fee":fee,"coupon_code":"","coupon_name":"","coupon_fee":"","consume_point_fee":0,"discount_total":fee,"card_id":22} | |
response = requests.post(url, headers=headers, json=data) | |
res = response.json() | |
# print(res) | |
payurl = res['result']['data']['payUrl'] | |
pattern = r"orderNo=(\w+)" | |
match = re.search(pattern, payurl) | |
order_number = match.group(1) | |
return order_number | |
def request_third(order_number): | |
headers = { | |
"Host": "sytgate.jslife.com.cn", | |
"Connection": "keep-alive", | |
"Content-Length": "365", | |
"xweb_xhr": "1", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a1b) XWEB/9129", | |
"Content-Type": "application/json", | |
"Accept": "*/*", | |
"Sec-Fetch-Site": "cross-site", | |
"Sec-Fetch-Mode": "cors", | |
"Sec-Fetch-Dest": "empty", | |
"Referer": "https://servicewechat.com/wx24b70f0ad2a9a89a/206/page-frame.html", | |
"Accept-Encoding": "gzip, deflate, br", | |
"Accept-Language": "zh-CN,zh;q=0.9" | |
} | |
data = { | |
"userId": "", | |
"orderNo": order_number, #/////// | |
"appType": "MINI_JSCARLIFE", | |
"channelId": "LKL_WX", | |
"subChannelId": "WeChat", | |
"couponList": [], | |
"callbackUrl": "MINI_JSCARLIFE", | |
"payType": "MINI_PROGRAM", | |
"reqSource": "WX_JTC", | |
"openId": "ofjjT5LRjJb_dmRhPO0D3YqcT598", | |
"appSource": "WX", | |
"subAppSource": "WX_XCX_JTC", | |
"unionId": "oRWxE56FsqijsUCbuZMUrKnXQKQ4", | |
"version": "2.0" | |
} | |
data1 ={"orderNo":order_number,"appType":"SERVICE"} | |
response = requests.post("https://sytgate.jslife.com.cn/core-gateway/payop/queryPayType", headers=headers, json=data1) | |
response = requests.post("https://sytgate.jslife.com.cn/core-gateway/order/pay/prepay", headers=headers, json=data).json() | |
print('request_third 支付信息:',response) | |
resultCode = response['resultCode'] | |
return resultCode | |
def pay(auth_id): | |
try: | |
authorization = authorization_list[auth_id] | |
# print("authorization:",authorization) | |
first_results = request_first(authorization) | |
if first_results!=None: | |
third_no,fee,begin,duration,user_id = first_results | |
print("third_no:",third_no, "入场时间:", begin, "用户ID:", user_id,"停车时长:",duration, "费用:",fee) | |
order_number = request_second(authorization,third_no,fee,begin) | |
# print("order_number:",order_number) | |
resultCode = request_third(order_number) | |
if resultCode == 2323: | |
return f"✅缴费成功 {fee}元" | |
elif resultCode == 3138: | |
return f"❌请求频繁系统拒绝 {fee}元" | |
elif resultCode == 0: | |
return f"❌支付码已经使用 {fee}元" | |
else: | |
return f"❌支付失败 {fee}元" | |
else: | |
return "🚷无法查询到车辆入场信息" | |
except Exception as e: | |
print("异常>>>>>>>>>>>问题原因:",e ) | |
return "🚫请求异常" | |
# 读取authorization.json | |
with open("authorization.json") as f: | |
authorization_list = json.load(f) | |
# print("authorization_list:",authorization_list) | |
history = [] | |
with gr.Blocks() as demo: | |
# 输入组件:单选按钮 | |
auth_id = gr.Radio(authorization_list, label="选择授权ID") | |
query_output = gr.Dataframe(headers=["费用", "入场时间", "时长"], label="查询结果") | |
pay_output = gr.Dataframe(headers=["支付时间", "支付结果", "使用ID"], label="支付历史") | |
# ------------------------------------------------------------------------- | |
with gr.Row(): | |
with gr.Column(scale=2): | |
query_button = gr.Button("查询") | |
def query_click(auth_id): | |
if auth_id == None: | |
auth_id = "0" | |
authorization = authorization_list[auth_id] | |
# print("authorization:",authorization) | |
first_results = request_first(authorization) | |
third_no,fee,begin,duration,user_id = first_results | |
return [(fee,begin,duration)],history | |
# ------------------------------------------------------------------------- | |
with gr.Column(scale=3): | |
# 支付按钮组件 | |
pay_button = gr.Button("支付") | |
def pay_click(auth_id): | |
pay_result = pay(auth_id) | |
history.append([str((datetime.now()+ timedelta(hours=6)).strftime("%Y-%m-%d %H:%M:%S")) , str(pay_result) ,str(auth_id) ]) | |
return history | |
demo.launch(auth=("admin", "eyJ0eXAiOiJKV1QiLCJhbGciOiJUzI1NiIsI")) | |