import asyncio from playwright.async_api import async_playwright from playwright.sync_api import sync_playwright #playwright codegen -o script.py -b webkit import smtplib from email.mime.text import MIMEText from email.utils import formataddr import requests import requests_html import json import glob import pandas as pd import xlwt import threading,time import os import time import datetime import random import re import easygui import eventlet import datetime import threading import sched #eventlet.monkey_patch() from multiprocessing import Pool #import stopit #@stopit.threading_timeoutable() thisyr = '2022' lastyr = '2021' qqsweb = "https://www.hapag-lloyd.cn/solutions/new-quote/#/quick-quotes-spot?language=en" """ imajor = easygui.ynbox("理科学生","",('是','否')) if imajor == 1: imajor = 1 else: imajor = 2 print(imajor) """ imajor = 1 async def main(): async with async_playwright() as p: for browser_type in [p.chromium, p.firefox, p.webkit]: browser = await browser_type.launch() page = await browser.new_page() await page.goto('https://www.baidu.com') await page.screenshot(path=f'screenshot-{browser_type.name}.png') print(await page.title()) #await browser.close() #asyncio.run(main()) #time.sleep(1000000) def on_response(response): #print(f'Statue {response.status}: {response.url}') if '/lightning/r/Event' in response.url :# and response.status == 200: #print("解析",response.json()) pass def run(playwright): browser = playwright.chromium.launch(headless=False) context = browser.newContext() # Open new page page = context.newPage() page.goto("https://www.baidu.com/") page.click("input[name=\"wd\"]") page.fill("input[name=\"wd\"]", "jingdong") page.click("text=\"京东\"") # Click //a[normalize-space(.)='京东JD.COM官网 多快好省 只为品质生活'] with page.expect_navigation(): with page.expect_popup() as popup_info: page.click("//a[normalize-space(.)='京东JD.COM官网 多快好省 只为品质生活']") page1 = popup_info.value # --------------------- #context.close() #关闭浏览器 #browser.close() #with sync_playwright() as playwright: #run(playwright) def run1(p): for browser_type in [p.webkit]: browser = browser_type.launch(headless=False) page = browser.new_page() page.goto('https://www.baidu.com') page.click("input[name=\"wd\"]") page.fill("input[name=\"wd\"]", "jingdong") page.click("text=\"京东\"") # Click //a[normalize-space(.)='京东JD.COM官网 多快好省 只为品质生活'] with page.expect_navigation(): with page.expect_popup() as popup_info: page.click("//a[normalize-space(.)='京东JD.COM官网 多快好省 只为品质生活']") page1= popup_info.value # --------------------- page1.screenshot(path=f'screenshot-{browser_type.name}.png') print(page1.title()) #page1.close() #browser.close() #with sync_playwright() as p: #run1(p) url = "https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en" def icookie(page): sessionid = "" cookies = page.context.cookies() # 删除过期的Cookie cookies.clear_expired_cookies() """ # 保存Cookie到文件 cookie_file = "my_cookies.txt" cookie_jar.save(cookie_file) # 从文件加载Cookie loaded_cookie_jar = http.cookiejar.MozillaCookieJar(cookie_file) loaded_cookie_jar.load() # 使用加载的Cookie进行请求 loaded_opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(loaded_cookie_jar)) loaded_response = loaded_opener.open("https://example.com") # 输出加载的Cookie for cookie in loaded_cookie_jar: print(f"Name: {cookie.name}, Value: {cookie.value}") """ iexpire = -1 for ck in cookies: try: if ck['name'] == "cf_clearance" and ck["expires"] > iexpire : iexpire = ck["expires"]+1000000 except: pass for ck in cookies: try: if ck['name'] == "JSESSIONID": sessionid = ck["value"] if iexpire != -1: ck['expires'] = iexpire if ck['name'] == "_cfuvid": if iexpire != -1: ck['expires'] = iexpire if ck['name'] == "auth_prod": if iexpire != -1: ck['expires'] = iexpire if ck['expires'] < iexpire and ck['expires'] != -1 and ck['name'] != "cf_clearance": ck['expires'] = iexpire except: pass page.context.add_cookies(cookies) #添加 cookie return sessionid def cfmall_key(): try: if page.get_by_role("button", name="Select All").count()>0: page.get_by_role("button", name="Select All").click() else: pass except: pass def HPL_QQ_pass(): #playwright codegen --load-storage=hlagqq.json www.hapag-lloyd.cn login = 1 # 0不登录会自动登陆 1 要登陆修改cookie 2 已登陆 一般选0 除非cookie失效那1. savelogin = 1 with sync_playwright() as p: #browser = p.chromium.launch(headless=False) #browser = p.webkit.launch(headless=False) browser = p.firefox.launch(headless=False) #browser = p.webkit.launch() #browser = p.chromium.launch() #browser = p.firefox.launch() context = browser.new_context() #page = browser.new_page() if login ==1 or login == 0: #page = context.new_page() #url ="https://hlag.lightning.force.com/lightning/o/Event/home" url = 'https://solutions.hapag-lloyd.com/quick-quotes/#/' url ='https://www.hapag-lloyd.cn/en/login.html' url = "https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en" if savelogin == 1: if login == 1: page = context.new_page() page.goto(url) else: page.goto("https://www.baidu.com") page.wait_for_load_state('networkidle') if login == 0: context = browser.new_context(storage_state = 'hlagqq.json') page = context.new_page() #page.on('response', on_response) try: page.goto(url) except: pass #page.wait_for_load_state('networkidle') #max_age = 600000000 #response = requests.get(url) #创建一个HTTP请求 #response.cookies.set('cookie_name', 'cookie_value', expires=max_age) #设置Cookie的过期时间 #response = requests.get(url, cookies=response.cookies) #发送HTTP请求 time.sleep(80) logid = easygui.ynbox("是否保留环境","",('是','否')) #自动登陆等待 if logid == 1: context.storage_state(path ='hlagqq_.json') savelogin = 0 #icookie(page) cookies = page.context.cookies() if logid == 1: context.storage_state(path ='hlagqq.json') savelogin = 0 #print(cookies) #page.context.close() #time.sleep(10000) logit = easygui.ynbox("是否需要修改MS环境","",('是','否')) #自动登陆等待 if logit == 1: HPL_QQ_pass() #记录密码环境账号 #time.sleep(100000) else: pass def amappic(): with sync_playwright() as p: iphone_12_pro_max = p.devices['iPhone 12 Pro Max'] browser = p.webkit.launch(headless=False) context = browser.new_context( **iphone_12_pro_max, locale='zh-CN', geolocation={'longitude': 116.39014, 'latitude': 39.913904}, permissions=['geolocation'] ) page = context.new_page() page.goto('https://amap.com') page.wait_for_load_state(state='networkidle') page.screenshot(path='location-iphone.png') browser.close() #amappic() def movielist(): with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() page.goto('https://spa6.scrape.center/') page.wait_for_load_state('networkidle') elements = page.query_selector_all('a.name') for element in elements: print(element.get_attribute('href')) print(element.text_content()) browser.close() #movielist() #time.sleep(1000000) def on_response(response): print(f'Statue {response.status}: {response.url}') if 'gaokao.cn/' in response.url and response.status == 200: print(f'Statue {response.status}: {response.url}') if 'eol.cn' in response.url and response.status == 200: print(f'Statue {response.status}: {response.url}') print('\n') zs2021plan = [] lastyearscore = [] zs_list = [] dx2021score = [] school_info = [] blacks = [199,574,3243,2811,1270,2619,2811,3568,1234,2810,1272,2809,2602,2626,3007,3008,3244,3243,3242,124,97,63,36,934,37,44,504,831,142,119,332,121,334,62,139,108,935,67,96,86,45,100,417,129,58,419,137,1218,74,283,367,291,866,304,3281,3152,591,301,297,364,298,32,110,875,576,832,1006,120,41,320] pro_list = [] jpage = [] #多少页面 clients= ["LUCKY STRIKE/FAK ACNSCGGZ09","GUANGZHOU BONRI/FAK ACNSCGGZ09","CHOICE SUPPLY / FAK ACNSCGGZ09", "HIGH GOAL LOGISTICS ACNSCGGZ09","GUANGZHOU SUNDA/BCO ACNSCGGZ09","TOP IDEAL /GUANGZHOU ACNSCGGZ09", "GUANGZHOU GOODLINKS ACNSCGGZ09","SANTA FE /FAK ACNSCGGZ09","EVER BEYOND, FOSHAN ACNSCGGZ09", "GUANGZHOU GREAT ACNSCGGZ09","HUA YUAN LOGISTICS L ACNSCGGZ09","FOSHAN PENGJIA BUILD ACNSCGGZ09"] clientids = ["LUCKY STRIKE/FAK","GUANGZHOU BONRI/FAK","CHOICE SUPPLY / FAK", "HIGH GOAL LOGISTICS","GUANGZHOU SUNDA/BCO","TOP IDEAL /GUANGZHOU", "GUANGZHOU GOODLINKS","SANTA FE /FAK","EVER BEYOND, FOSHAN", "GUANGZHOU GREAT","HUA YUAN LOGISTICS L","FOSHAN PENGJIA BUILD"] #"Account LUCKY STRIKE/FAK ACNSCGGZ09" / "Account GUANGZHOU BONRI/FAK ACNSCGGZ09" /"Account CHOICE SUPPLY / FAK ACNSCGGZ09" /"Account HIGH GOAL LOGISTICS ACNSCGGZ09" isecure = 0 #找那些出错的资料 secureid = [1275,289,73,105,51] ssid = [] schoolids =[] schoolpage = [] #学校页面 (录取多少页,招生多少页,历史分多少页) def startday(a,dd): #a="2023-04-30" k= time.strptime(a,"%Y-%m-%d") y,m,d = k[0:3] t=datetime.datetime(y,m,d)+datetime.timedelta(days= dd) k=t.strftime("%m/%d/%Y") #print(t,k) return k def qq_mail(mail_data,key): my_sender = '2035055632@qq.com' my_pass = "irivlrhicgcqdiii" my_user = "patrick.wu@hlag.cn" my_user2 = ["ivan@choicexp.com.cn","ann@choicexp.com",'haiyun.xu@topideal.com.cn','johnhe@ylt56.com'] ret = True if "@@" in mail_data: pass else: return 0 try: msg = MIMEText(mail_data,'plain','utf-8') msg['From'] = formataddr(['HL Sales',my_sender]) msg['To'] = formataddr(['Dear Customer',my_user]) msg['Subject'] = key +"QQS Daily Express" server = smtplib.SMTP_SSL("smtp.qq.com",465) server.login(my_sender,my_pass) server.sendmail(my_sender,[my_user,],msg.as_string()) server.quit() except Exception: ret = False if ret == True: print(' 邮件发送Success',datetime.datetime.now()) else: print(' 邮件发送失败',datetime.datetime.now()) #time.sleep(10000) for user in my_user2: try: msg = MIMEText(mail_data,'plain','utf-8') msg['From'] = formataddr(['HL Sales',my_sender]) msg['To'] = formataddr(['Dear Customer',user]) msg['Subject'] = "HL QQS Daily Express请勿外传"+mail_data[0:5] server = smtplib.SMTP_SSL("smtp.qq.com",465) server.login(my_sender,my_pass) server.sendmail(my_sender,[user,],msg.as_string()) server.quit() except Exception: ret = False def QQS_item_revise(page,iqqs,pol_c): cfmall_key() try: if iqqs['pol'] != pol_c: try: page.locator("label").filter(has_text="Start Location").locator("svg").nth(1).click() except: time.sleep(10) page.locator("label").filter(has_text="Start Location").locator("svg").nth(1).click() try: page.get_by_placeholder("Start Location").fill(iqqs['pol']) except: time.sleep(10) page.get_by_placeholder("Start Location").fill(iqqs['pol']) #page.get_by_placeholder("Start Location").press("ArrowDown") time.sleep(2) page.get_by_label("Start Location").press("ArrowDown") #page.get_by_label("Start Location").press("ArrowDown") try: page.get_by_text(iqqs['polfull']).click() except: time.sleep(10) page.get_by_label("Start Location").press("ArrowDown") page.get_by_text(iqqs['polfull']).click() try: page.locator("label").filter(has_text="End Location").locator("svg").nth(1).click() time.sleep(1) except: time.sleep(10) page.locator("label").filter(has_text="End Location").locator("svg").nth(1).click() time.sleep(5) try: page.get_by_placeholder("End Location").fill(iqqs['pod']) except: time.sleep(10) page.get_by_placeholder("End Location").fill(iqqs['pod']) time.sleep(3) #page.get_by_placeholder("End Location").press("ArrowDown") time.sleep(2) #page.get_by_label("End Location").press("ArrowDown") page.get_by_label("End Location").press("ArrowDown") try: page.get_by_text(iqqs['podfull']).click() except: time.sleep(10) page.get_by_label("End Location").press("ArrowDown") page.get_by_text(iqqs['podfull']).click() sz=0 try: sz = page.locator("div").filter(has_text=re.compile(r"^20' General Purpose$")).count() if sz >0: szkw =20 else: szkw =40 except: sz = page.locator("div").filter(has_text=re.compile(r"^40' General Purpose High Cube$")).count() if sz > 0: szkw = 40 else: szkw = 20 if "20" in iqqs['size']: isize = 20 else: isize = 40 if isize != szkw: if szkw == 20: try: page.locator("div").filter(has_text=re.compile(r"^20' General Purpose$")).click() except: time.sleep(10) page.locator("div").filter(has_text=re.compile(r"^20' General Purpose$")).click() time.sleep(2) else: try: page.locator("div").filter(has_text=re.compile(r"^40' General Purpose High Cube$")).click() except: time.sleep(10) page.locator("div").filter(has_text=re.compile(r"^40' General Purpose High Cube$")).click() time.sleep(2) try: page.get_by_role("option", name = iqqs['size']).locator("div").nth(2).click() except: time.sleep(10) page.get_by_role("option", name = iqqs['size']).locator("div").nth(2).click() time.sleep(2) else: pass try: page.get_by_role("button", name="Request Offers").click() time.sleep(5) except: pass bb=0 cc=0 while bb ==0: try: kk=page.get_by_role("link", name="Edit request").count() if kk>0: bb =1 time.sleep(2) else: bb =0 time.sleep(5) except: bb =0 time.sleep(5) try: if page.get_by_role("button", name="Request Offers").count() >0 : page.get_by_role("button", name="Request Offers").click() time.sleep(10) else: time.sleep(10) pass except: pass cc=cc+1 if cc ==3: qqs_polpodin(page,iqqs) print(" ",iqqs['pod'],"查询价格次数",cc,datetime.datetime.now()) if cc >1: try: if page.locator("div.q-notifications:nth-child(1) > div:nth-child(5)").count()>0: page.get_by_role("button", name="Dismiss").click() # button.q-btn:nth-child(2) > span:nth-child(2) > span:nth-child(1) bb=1 cc=100 except: pass try: if page.get_by_role("button", name="Dismiss").count()>1: page.get_by_role("button", name="Dismiss").first.click() bb=1 cc=100 except: pass try: if page.get_by_role("button", name="Dismiss").count()>0: page.get_by_role("button", name="Dismiss").click() bb=1 cc=100 except: pass if cc>6: bb =1 else: pass if cc>30: return 30 elif cc >6: return 0 else: return 1 except: return 0 def qqs_polpodin(page,iqqs): #print("polpod") cfmall_key() try: page.goto(qqsweb,wait_until='domcontentloaded') time.sleep(10) except: pass cfmall_key() try: if page.get_by_placeholder("Start Location").count()>0: page.get_by_placeholder("Start Location").click() page.get_by_placeholder("Start Location").fill(iqqs["pol"]) time.sleep(2) page.get_by_placeholder("Start Location").press("ArrowDown") time.sleep(4) page.get_by_text(iqqs['polfull']).click() time.sleep(2) if page.get_by_placeholder("End Location").count()>0: page.get_by_placeholder("End Location").click() page.get_by_placeholder("End Location").fill(iqqs['pod']) time.sleep(1) page.get_by_placeholder("End Location").press("ArrowDown") time.sleep(4) page.get_by_text(iqqs['podfull']).click() time.sleep(1) if page.locator("div").filter(has_text=re.compile(r"^Select container type$")).count()>0: page.locator("div").filter(has_text=re.compile(r"^Select container type$")).click() time.sleep(1) page.get_by_text(iqqs['size']).click() time.sleep(1) if page.get_by_placeholder("Container Quantity").count()>0: page.get_by_placeholder("Container Quantity").click() time.sleep(1) page.get_by_placeholder("Container Quantity").fill("1") if page.get_by_role("spinbutton", name="Weight per Container").count()>0: page.get_by_role("spinbutton", name="Weight per Container").click() time.sleep(1) page.get_by_role("spinbutton", name="Weight per Container").fill("10000") except: pass i=2 while i >0: i=i-1 #print("icheck",datetime.datetime.now()) ipol ="" try: if page.get_by_label("Start Location").count()>0: ipol = page.get_by_label("Start Location").input_value() except: pass if ipol != iqqs['polfull'] or len(ipol)<2: try: page.get_by_placeholder("Start Location").fill(iqqs["pol"]) time.sleep(1) page.get_by_placeholder("Start Location").press("ArrowDown") time.sleep(4) page.get_by_text(iqqs['polfull']).click() time.sleep(2) except: pass #print(ipol,"ipol",datetime.datetime.now()) ipod ="" try: if page.get_by_label("End Location").count() >0: ipod = page.get_by_label("End Location").input_value() except: pass if ipod != iqqs['podfull'] or len(ipod)<2: try: page.get_by_placeholder("End Location").click() page.get_by_placeholder("End Location").fill(iqqs['pod']) time.sleep(1) page.get_by_placeholder("End Location").press("ArrowDown") time.sleep(4) page.get_by_text(iqqs['podfull']).click() time.sleep(2) except: pass #print(ipod,"ipod",datetime.datetime.now()) isize ="" try: if page.locator("div").filter(has_text=re.compile(r"^Select container type$")).count()>0: page.locator("div").filter(has_text=re.compile(r"^Select container type$")).click() time.sleep(1) page.get_by_text(iqqs['size']).click() time.sleep(1) print(isize,"isize",isize,datetime.datetime.now()) except: pass try: iq = "" if page.get_by_label("Container Quantity").count()>0: iq= page.get_by_label("Container Quantity").input_value() if iq != 1 or len(iq)<1: age.get_by_placeholder("Container Quantity").click() page.get_by_placeholder("Container Quantity").fill("1") page.get_by_placeholder("Container Quantity").press("Tab") except: pass #print(iq,"iequ",datetime.datetime.now()) try: iw ="" if page.get_by_role("spinbutton", name="Weight per Container").count()>0: iw = page.get_by_role("spinbutton", name="Weight per Container").input_value() if iw != 10000 or len(iw)<2: page.get_by_role("spinbutton", name="Weight per Container").fill("10000") except: pass #print(iw,"iweight",datetime.datetime.now()) #iq= page.get_by_label("Container Quantity").input_value() #iw = page.get_by_role("spinbutton", name="Weight per Container").input_value() def QQS_item_new(page,iqqs,browser): cfmall_key() Webid_login(page,browser) cfmall_key() kk = 0 jj = 0 while kk ==0: #cfmall_key() try: kk = page.get_by_label("Start Location").count() time.sleep(3) if kk>0: kk =1 time.sleep(4) page.get_by_label("Start Location").click() time.sleep(4) except: kk =0 jj = jj+1 print('cannot find StartLocation',jj) if jj > 4: re_loginout(page) page.goto(qqsweb,wait_until='domcontentloaded') time.sleep(8) cfmall_key() jj=0 Webid_login(page,browser) time.sleep(10) qqs_polpodin(page,iqqs) #execute iqqs fillin new page try: if page.get_by_role("button", name="Request Offers").count()>0: page.get_by_role("button", name="Request Offers").click() else: time.sleep(15) except: try: if page.get_by_role("button", name="Request Offers").count()>0: page.get_by_role("button", name="Request Offers").click() except: pass time.sleep(15) bb=0 cc = 0 while bb ==0: try: kk=page.get_by_role("link", name="Edit request").count() if kk>0: bb =1 time.sleep(2) else: bb =0 time.sleep(5) except: bb =0 time.sleep(5) try: if page.get_by_role("button", name="Request Offers").count() >0: page.get_by_role("button", name="Request Offers").click() time.sleep(10) else: time.sleep(10) pass except: pass cc=cc+1 #判断点价格多少次 if cc ==3: qqs_polpodin(page,iqqs) print(" ",iqqs['pod'],"查询价格次数",cc,datetime.datetime.now()) if cc >1: try: if page.locator("div.q-notifications:nth-child(1) > div:nth-child(5)").count()>0: page.get_by_role("button", name="Dismiss").click() # button.q-btn:nth-child(2) > span:nth-child(2) > span:nth-child(1) bb=1 cc=100 except: pass try: if page.get_by_role("button", name="Dismiss").count()>1: page.get_by_role("button", name="Dismiss").first.click() bb=1 cc=100 except: pass try: if page.get_by_role("button", name="Dismiss").count()>0: page.get_by_role("button", name="Dismiss").click() bb=1 cc=100 except: pass if cc >6: bb =1 time.sleep(20) else: pass time.sleep(1) if cc>30: return 30 elif cc > 6: return 0 else: return 1 checkport ={"pol":['CNNSA'], "polf":['NANSHA (CNNSA)'], "podf":["LUANDA (AOLAD)","MATADI (CDMAT)","TEMA (GHTEM)"], "pod":["AOLAD","CDMAT","GHTEM"], 'size':["40' General Purpose High Cube"] } checkportL ={"pol":['CNNSA','CNNGB','CNTAO'], "polf":['NANSHA (CNNSA)','NINGBO (CNNGB)','QINGDAO (CNTAO)'], "podf":["LUANDA (AOLAD)","MATADI (CDMAT)","DOUALA (CMDLA)","KRIBI (CMKBI)","POINTE NOIRE (CGPNR)","DURBAN (ZADUR)","TEMA (GHTEM)","DAKAR (SNDKR)","CONAKRY (GNCKY)","MOMBASA (KEMBA)","DAR-ES-SALAAM (TZDAR)"], "pod":["AOLAD","CDMAT","CMDLA","CMKBI",'CGPNR',"ZADUR","GHTEM","SNDKR","GNCKY","KEMBA","TZDAR"], 'size':["40' General Purpose High Cube","20' General Purpose"] } def pass_need(page1): cfmall_key() #print("passneed") if page1.get_by_label("Start Location").count() >0: return try: if page.get_by_role("link", name="Log in here").count()>0: page.get_by_role("link", name="Log in here").click() time.sleep(8) else: cfmall_key() except: pass cfmall_key() try: #刷新重新登录输入账号 try: if page1.get_by_placeholder("Please enter with email as user-id (UPN)").count()>0 and page.get_by_role("link", name="Can’t access your account?").count()>0: page1.get_by_placeholder("Please enter with email as user-id (UPN)").fill("taopatrick.wu@hlag.com") page.get_by_role("button", name="Next").click() time.sleep(8) except: pass except: pass try: #验证密码过程 if page1.get_by_role("link", name="Forgot my password").count() >0 and page.get_by_role("button", name="Sign in").count()>0: #print("密码过期") if page1.get_by_placeholder("Password").count()>0: page1.get_by_placeholder("Password").click() page1.get_by_placeholder("Password").fill("MAMAma19@") time.sleep(4) page1.get_by_role("button", name="Sign in").click() #输入完密码了 time.sleep(2) if page1.get_by_role("button", name="Text +XX XXXXXXXXX65‎").count()>0: page1.get_by_role("button", name="Text +XX XXXXXXXXX65‎").click() time.sleep(2) print("密码过期 等待确认代码MFA") imfa = 0 if page1.get_by_role("textbox", name="Enter code").count()>0 and page1.get_by_role("button", name="Verify").count()>0: while imfa ==0: time.sleep(20) try: if page1.get_by_text("Don't show this again").count()>0: #验证mfa后点确认不弹窗 imfa = 1 page1.get_by_text("Don't show this again").click() time.sleep(2) page1.get_by_role("button", name="Yes").click() time.sleep(2) except: pass try: if page.get_by_label("Start Location").count() >0: imfa = 1 page.goto(qqsweb,wait_until='domcontentloaded') time.sleep(10) cfmall_key() try: context.storage_state(path ='hlagqq.json') except: pass return except: pass else: pass except: pass cfmall_key() def webid_pass(page): page.goto(qqsweb,wait_until='domcontentloaded') #page.locator("div").filter(has_text="Privacy Preference Center").nth(2).click() time.sleep(5) cfmall_key() kk = 0 while kk ==0: cfmall_key() time.sleep(8) try: page.get_by_role("link", name="Log in here").click() time.sleep(5) cfmall_key() except: pass if page.get_by_role("link", name="Log in here").count()>0 or page.get_by_label("Start Location").count() >0: kk =1 else: cfmall_key() time.sleep(5) try: if page.get_by_role("link", name="Log in here").count()>0: page.get_by_role("link", name="Log in here").click() time.sleep(10) cfmall_key() else: cfmall_key() except: pass cfmall_key() try: time.sleep(5) if page.get_by_label("Start Location").count() >0: cfmall_key() return else: cfmall_key() except: pass try: pass_need(page) except: pass try: #测试旧款验证密码过期 with page.expect_popup() as page1_info: #print("webidpass checkhere") cfmall_key() try: if page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count()>0: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(5) except: pass try: page1 = page1_info.value pass_need(page1) except: pass time.sleep(8) try: page1.close() except: pass page.goto(qqsweb,wait_until='domcontentloaded') time.sleep(10) cfmall_key() #page.locator(".q-item").first.click() #page.get_by_test_id("base-menu-items").get_by_role("link", name="Quick Quotes Spot").click() except: pass #print("login_ready") cfmall_key() def re_loginout(page): #退出账号and enter again cfmall_key() try: page.get_by_test_id("toggle-dropdown-menu-icon").locator("svg").click() time.sleep(4) except: pass try: page.get_by_text("Logout").click() except: try: page.get_by_test_id("toggle-dropdown-menu-icon").locator("svg").click() time.sleep(4) except: pass try: page.get_by_text("Logout").click() except: pass time.sleep(5) cfmall_key() #page.reload() try: page.goto(qqsweb,wait_until='domcontentloaded') time.sleep(5) cfmall_key() except: page.reload() time.sleep(5) cfmall_key() iid =0 while iid ==0: cfmall_key() time.sleep(10) try: cfmall_key() if page.get_by_label("Start Location").count() >0: iid =1 return else: cfmall_key() except: pass try: time.sleep(5) #print("webidpass") webid_pass(page) cfmall_key() time.sleep(5) except: pass def reload_web(browser,page): print("looks need cookie reload by kill cookie" ) page.close() print("re_loadit" ) context = browser.new_context() context = browser.new_context(storage_state = 'hlagqq.json') page = context.new_page() page.goto("https://www.baidu.com") #icookie(page) page.goto(qqsweb,wait_until='domcontentloaded') cfmall_key() webid_pass(page) cfmall_key() time.sleep(40) def login_web(browser,page): cfmall_key() try: if page.get_by_label("Start Location").count() >0: return except: pass urla = "https://www.hapag-lloyd.cn/solutions/quick-quotes/#/" urla = qqsweb try: cfmall_key() page.goto(urla,wait_until='domcontentloaded') time.sleep(25) cfmall_key() kk=0 jj=0 while kk ==0: cfmall_key() try: cfmall_key() try: if page.get_by_label("Start Location").count() >0: kk = 1 return 1 else: pass except: pass try: webid_pass(page) cfmall_key() except: pass """ try: kk =page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count() except: pass """ cfmall_key() if kk >0: kk =1 time.sleep(1) else: jj=jj+1 kk = 0 #page.reload() page.goto(urla,wait_until='domcontentloaded') time.sleep(15) cfmall_key() print("waitlogin page",jj) if jj > 3 : try: #reload_web(browser,page) re_loginout(page) cfmall_key() jj=0 kk=0 except: pass else: pass except: cfmall_key() kk = 0 time.sleep(10) try: if page.get_by_label("Start Location").count() >0: kk = 1 return 1 else: pass except: pass #print(kk,4) cfmall_key() try: page.get_by_role("link", name="Log in here").click() except: pass cfmall_key() try: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(20) except: pass cfmall_key() bb = 0 while bb ==0: cfmall_key() try: bb= page.get_by_placeholder("Start Location").count() time.sleep(2) except: pass if bb >0: kk = 1 else: kk = 0 bb = 0 time.sleep(5) cfmall_key() try: page.get_by_role("link", name="Log in here").click() except: pass cfmall_key() try: if page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count()>0: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(10) except: pass except: pass try: page.reload(qqsweb,wait_until='domcontentloaded') except: pass cfmall_key() def Webid_login(page,browser): try: page.goto(qqsweb,wait_until='domcontentloaded') time.sleep(10) cfmall_key() except: pass try: if page.get_by_label("Start Location").count() >0: return except: pass sessionid ="" #sessionid = icookie(page) try: page.goto(qqsweb,wait_until='domcontentloaded') time.sleep(10) cfmall_key() except: pass try: webid_pass(page) cfmall_key() except: pass kk = 0 try: cfmall_key() kk = page.get_by_label("Start Location").count() time.sleep(3) if kk>0: kk =1 cfmall_key() return page else: #print("web lack of Start Location 0_login",sessionid) login_web(browser,page) cfmall_key() time.sleep(30) except: kk =0 try: page.reload() cfmall_key() except: pass print('cannot find 0',sessionid ) time.sleep(5) iwait = 0 while kk ==0: iwait = iwait+1 #iwait = 100 if iwait > 6: #cookie 失效 重新鉴权 cfmall_key() reload_web(browser,page) cfmall_key() login_web(browser,page) cfmall_key() time.sleep(30) iwait = 0 try: cfmall_key() kk = page.get_by_label("Start Location").count() time.sleep(1) if kk>0: kk =1 else: print("web lack of Start Location 1",iwait,sessionid ) page.reload() time.sleep(10) cfmall_key() except: kk =0 try: page.reload() cfmall_key() except: pass print('cannot find',iwait,sessionid ) time.sleep(10) page.goto(qqsweb,wait_until='domcontentloaded') cfmall_key() return page ts = " X没位" ts = " 贵" #print(ts[0:3]) def QQS_short_check(page,checkport,browser): cfmall_key() pagex = Webid_login(page,browser) #login uid and ready for search cfmall_key() mails ="" print(' CNNSA_开始进入QQS查询') mails = '开始进入QQS查询\r\n' pol = checkport['pol'] #['CNNSA'] polfull =checkport['polf'] #['NANSHA (CNNSA)'] podfull = checkport['podf'] #["LUANDA (AOLAD)","MATADI (CDMAT)","TEMA (GHTEM)"] pod = checkport['pod']#["AOLAD","CDMAT","GHTEM"] size = checkport['size'] #["40' General Purpose High Cube"] q_qqs =[] if len(pod)>5: key= "L " else: key ="" i=0 for esize in size: #print(esize) i=0 for ipol in pol: j=0 for ipod in pod: qqssize = { 'pol': ipol, 'polfull':polfull[i], 'pod': ipod,'podfull':podfull[j],'size':esize} j=j+1 q_qqs.append(qqssize) i=i+1 if len(size) < 2: #q_qqs.append({'pol':'CNNGB', 'polfull':'NINGBO (CNNGB)', 'pod': "CDMAT",'podfull':"MATADI (CDMAT)",'size':"40' General Purpose High Cube"}) q_qqs.append({'pol':'CNNSA', 'polfull':'NANSHA (CNNSA)', 'pod': "TZDAR",'podfull':"DAR-ES-SALAAM (TZDAR)",'size':"40' General Purpose High Cube"}) #print(q_qqs) q=0 istatus = 1 qqs_rate = [] qqs_rateall =[] mail ="QQS 查询开始\r\n" icount = 0 ipol_v = 0 pol_c = pol[0] ipol_c = polfull[0] #first pol ipod_c = podfull[0] #mails = mails+ipol_c +"\r\n" for iqqs in q_qqs: icount=icount+1 if q==0 or istatus == 0: #print("new 786") istatus = QQS_item_new(page,iqqs,browser) q =1 else: #print("old 790") istatus = QQS_item_revise(page,iqqs,pol_c) if istatus ==0: istatus = QQS_item_new(page,iqqs,browser) if istatus == 30: page.reload() q = 0 #print("web no information avaialbe,dismiss",iqqs) mails = mails + "\r\n "+ iqqs['pol']+"=>"+ iqqs['pod']+":"+iqqs['size'] +" miss data \r\n" istatus = 0 continue pol_c= iqqs['pol'] # 船期价格 #print('\n') mails = mails+ "\r\n" #print(iqqs['pol']+ '_'+ iqqs['pod'] + '-'+iqqs['size']) mails = mails + iqqs['polfull']+ '_'+ iqqs['podfull'] + '-'+iqqs['size']+"\r\n" iitem = page.locator("div.offers-item").count()#多少记录 #print(iitem,"tiao jilu") qq = "" for i in range(2,iitem+2): try: kk = page.locator("div.offers-item:nth-child("+str(i)+") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2)").count() if kk >0: #print("\n",i,"line",kk) webtxt = page.locator("div.offers-item:nth-child("+str(i)+") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2)").inner_text() #whole text on web #print(i,webtxt) #time.sleep(50) if i == 2 and qq == "": try: qq = page.locator("div.offers-item:nth-child("+str(i)+") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2)> button:nth-child(2) > div:nth-child(2)").inner_text() qq = qq.replace("\u202f","") except: qq = page.locator("div.offers-item:nth-child(3) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2)> button:nth-child(2) > div:nth-child(2)").inner_text() qq = qq.replace("\u202f","") #print(i,qq) #print(webtxt,"\n") if i ==2: #list qq rate mails = mails+"FAK/QQ "+qq.replace(" ","").replace("USD","$")+ "+MFR+Others \r\n" if "No more space" in webtxt: #print("No Space") qqs = "No Space" else: qqs = page.locator("div.offers-item:nth-child("+str(i)+") > div:nth-child(1) > div:nth-child(1) > div:nth-child(2)> button:nth-child(1) > div:nth-child(2)").inner_text() qqs = qqs.replace("\u202f","") #print(i,qqs) #qqs #print('QQ',qq) vsl = page.locator("div.offers-item:nth-child("+str(i)+") > div:nth-child(1) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1)").inner_text() etd = page.locator("div.offers-item:nth-child("+str(i)+") > div:nth-child(1) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2)").inner_text() etd =etd.replace('\n','-').replace('\r','').replace("transsipment","T/S").replace("DAYS","天").replace("Direct","直航").replace(" ","").replace("via","经") etd = " " + etd.replace('transshipment','中转').replace("2023","").replace("2024","").replace("2025","")+" " qqs=qqs.replace('\n','=>').replace('\r','') if "available" in qqs: qqs = "No Space" vsl=vsl.replace('\n','-').replace('\r','') vsl=vsl.replace("Earliest Arrival","-") qq = qq.replace('\n','-').replace('\r','') mail_0 = mails discount= 0 try: discount = int(qq.replace(" ","").replace("USD","").replace("EUR","")) - int(qqs.replace(" ","").replace("USD","").replace("EUR","")) if discount <=0: discount = 0 except: pass #print(i,vsl,qq,qqs,"\n") try: if discount == 0: pass else: mails = mails +"@@低" +str(discount)+" "+ iqqs['pod'] + " " + qqs.replace(" ","").replace("USD","$") +" "+" | vs QQ "+ qq.replace(" ","").replace("USD","$") +'-(' +etd.replace('transshipment','中转') + ')-'+ vsl.replace('t CO2e','碳')+"\r\n" """ if qqs == "No Space": mails = mails + " X没位" + vsl.replace('t CO2e','吨碳') +" | 订 QQ "+ qq.replace(" ","").replace("USD","$")+etd+"\r\n" elif int(qqs.replace(" ","").replace("USD","")) >= int(qq.replace(" ","").replace("USD","")): mails = mails + " 贵" + iqqs['pod'] +" | 订 QQ "+ qq.replace(" ","").replace("USD","$")+" | QQS "+ qqs.replace(" ","").replace("USD","$")+etd+ vsl.replace('t CO2e','吨碳') +"\r\n" else: mails = mails + iqqs['pod'] + " " + qqs.replace(" ","").replace("USD","$") +" "+" | vs QQ "+ qq.replace(" ","").replace("USD","$") +'-(' +etd.replace('transshipment','中转') + ')-'+ vsl.replace('t CO2e','碳')+"\r\n" """ except: #文字过长 发送并清空 qq_mail(mail_0) mails = "QQS 查询开始\r\n" + iqqs['pol']+ '_'+ iqqs['pod'] + '-'+iqqs['size']+"\r\n" + iqqs['pod'] +" "+ qqs.replace(" ","").replace("USD","$") +" | vs QQ "+ qq.replace(" ","").replace("USD","$") + etd.replace('transshipment','中转') + vsl.replace('t CO2e','碳') +"\r\n" qqprice = {'route': iqqs['pol']+'_'+iqqs['pod']+'_'+iqqs['size'][0:4], "qqs":qqs.replace(" ","").replace("USD","$")+' | vs QQ '+ qq.replace(" ","").replace("USD","$"), 'etd':etd.replace('transshipment','中转').replace("2023","").replace("2024","").replace("2025",""), 'vsl':vsl.replace('t CO2e','吨碳'), 'fak':qq.replace(" ","").replace("USD","$"), 'discount':discount } qqs_rate.append(qqprice) qqs_rateall.append(qqprice) #print(qqprice) #time.sleep(120) except: pass bb = 0 while bb == 0: #重刷或者修改 try: page.get_by_role("link", name="Edit request").click() time.sleep(3) except: time.sleep(10) try: page.get_by_role("link", name="Edit request").click() time.sleep(3) except: istatu =0 pass try: if page.get_by_role("button", name="Request Offers").count()>0: bb =1 else: try: if page.get_by_role("link", name="Edit request").count() >0: page.get_by_role("link", name="Edit request").click() time.sleep(3) except: pass except: pass mail = "\r\n"+mail+ipol_c +"\r\n" #last pol mail 好像没啥用 for line in qqs_rate: mail_0 = mail if ipod_c != iqqs['podfull']: ipod_c = iqqs['podfull'] mail = '\r\n'+iqqs['podfull']+'\r\n' #mail = mail +"我司QQ rate " +line['fak']+" "+'\r\n' try: #mail = mail+line['route']+line['qqs']+"_|_"+line['etd']+line['vsl']+'\r\n' mail = mail+line['route']+line['qqs']+"_|_"+line['etd']+line['vsl']+'\r\n' except: mail = " "+line['route']+line['qqs']+"_|_"+line['etd']+line['vsl']+'\r\n' #print(mails) #print(mails) #time.sleep(100000) qq_mail(mails,key) return page def QQS_search(page): pass #结束查询 #time.sleep(100000) def xmovielist(): with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() page.goto('https://spa6.scrape.center/') page.wait_for_load_state('networkidle') elements = page.query_selector_all('a.name') for element in elements: print(element.get_attribute('href')) print(element.text_content()) browser.close() #xmovielist() def SFLOAD_QQS_back(page): #playwright codegen --load-storage=hlagqq.json www.hapag-lloyd.cn login = 0 # 0不登录会自动登陆 1 要登陆修改cookie 2 已登陆 一般选0 除非cookie失效那1. savelogin = 0 with sync_playwright() as p: browser = p.chromium.launch(headless=False) #browser = p.webkit.launch(headless=False) #browser = p.firefox.launch(headless=False) #browser = p.webkit.launch() #browser = p.chromium.launch() #browser = p.firefox.launch() context = browser.new_context() #page = browser.new_page() if login ==1 or login == 0: #page = context.new_page() #url ="https://hlag.lightning.force.com/lightning/o/Event/home" url = 'https://solutions.hapag-lloyd.com/quick-quotes/#/' url ='https://www.hapag-lloyd.cn/en/login.html' url = "https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en" if savelogin == 1: if login == 1: page = context.new_page() page.goto(url) else: page.goto("https://www.baidu.com") page.wait_for_load_state('networkidle') if login == 0: context = browser.new_context(storage_state = 'hlagqq.json') page = context.new_page() #page.on('response', on_response) try: page.goto(url) except: pass #page.wait_for_load_state('networkidle') #logid =1 #logid = easygui.ynbox("是否不用登陆","",('是','否')) #自动登陆等待 #page.pause() if savelogin == 1: #保存登陆状况cookie context.storage_state(path ='hlagqq.json') savelogin = 0 if savelogin == 1: savelogin = 0 context.storage_state(path ='hlagqq.json') print(context.storage_state) if logid == 2: #如果没进入重新进入系统 context = browser.new_context(storage_state = 'hlagsf.json') page = context.new_page() page.goto(url) time.sleep(30) time.sleep(80) logid = easygui.ynbox("是否保留环境","",('是','否')) #自动登陆等待 if logid == 1: context.storage_state(path ='hlagqqnew.json') savelogin = 0 try: page.goto(url) time.sleep(10) kk=0 while kk ==0: try: page.get_by_role("link", name="Log in here").click() except: pass try: #kk =page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count() kk = page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count() if kk >0: kk =1 time.sleep(1) else: kk = 0 time.sleep(5) except: kk = 0 time.sleep(10) try: page.get_by_role("link", name="Log in here").click() except: pass try: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(20) except: pass iwait = 0 bb = 0 while bb ==0: iwait = iwait+1 if iwait >20: iwait =0 page.reload() page.goto("https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en") time.sleep(20) try: page.get_by_role("link", name="Log in here").click() except: pass try: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count()>0 page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(15) except: pass try: bb= page.get_by_placeholder("Start Location").count() time.sleep(2) if bb >0: kk = 1 else: kk = 0 bb = 0 time.sleep(5) except: pass try: if page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count()>0: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(10) except: pass except: pass try: page.goto("https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en") time.sleep(5) kk=0 while kk == 0: try: kk = page.get_by_placeholder("Start Location").count() time.sleep(2) except: time.sleep(5) kk =0 except: pass QQS_search(page) #QQS_short_check(page) def SFLOAD_QQS_1port(page): QQS_short_check(page,checkport) def SFLOAD_QQS(page): QQS_search(page) def SFLOAD_QQS_1port_back(): #playwright codegen --load-storage=hlagqq.json www.hapag-lloyd.cn login = 0 # 0不登录会自动登陆 1 要登陆修改cookie 2 已登陆 一般选0 除非cookie失效那1. savelogin = 0 with sync_playwright() as p: #browser = p.chromium.launch(headless=False) #browser = p.webkit.launch(headless=False) browser = p.firefox.launch(headless=False) #browser = p.webkit.launch() #browser = p.chromium.launch() #browser = p.firefox.launch() context = browser.new_context() #page = browser.new_page() if login ==1 or login == 0: url = 'https://solutions.hapag-lloyd.com/quick-quotes/#/' url ='https://www.hapag-lloyd.cn/en/login.html' url = "https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en" if savelogin == 1: if login == 1: page = context.new_page() page.goto(url) else: page.goto("https://www.baidu.com") page.wait_for_load_state('networkidle') if login == 0: context = browser.new_context(storage_state = 'hlagqq.json') page = context.new_page() #page.on('response', on_response) try: page.goto(url) except: pass if savelogin == 1: #保存登陆状况cookie context.storage_state(path ='hlagqq.json') savelogin = 0 if savelogin == 1: savelogin = 0 context.storage_state(path ='hlagqq.json') print(context.storage_state) try: page.goto(url) time.sleep(10) kk=0 while kk ==0: try: kk =page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count() #kk =page.frame_locator("iframe[name=\"b2cAuthFrame\"]").get_by_role("link", name="Log in here").click() if kk >0: kk =1 time.sleep(1) else: kk = 0 time.sleep(5) except: kk = 0 time.sleep(10) page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() #page.frame_locator("iframe[name=\"b2cAuthFrame\"]").get_by_role("link", name="Log in here").click() time.sleep(20) bb = 0 iwait = 0 while bb ==0: iwait = iwait+1 if iwait >20: iwait = 0 page.reload() page.goto(url) time.sleep(20) try: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count()>0 page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(15) except: pass try: page.goto("https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en") time.sleep(5) bb= page.get_by_placeholder("Start Location").count() time.sleep(2) if bb >0: kk = 1 else: kk = 0 bb = 0 time.sleep(5) except: pass try: if page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count()>0: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(10) except: pass except: pass try: page.goto("https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en") time.sleep(5) kk=0 while kk == 0: try: kk = page.get_by_placeholder("Start Location").count() #print("ok") time.sleep(2) except: time.sleep(5) #print("bad") kk =0 except: pass #print("good") #QQS_search(page) #time.sleep(100000) QQS_short_check(page) def SFLOAD(): #playwright codegen --load-storage=hlagqq.json www.hapag-lloyd.cn login = 0 # 0不登录会自动登陆 1 要登陆修改cookie 2 已登陆 一般选0 除非cookie失效那1. savelogin = 0 with sync_playwright() as p: #browser = p.chromium.launch(headless=False) #browser = p.webkit.launch(headless=False) browser = p.firefox.launch(headless=False) #browser = p.webkit.launch() #browser = p.chromium.launch() #browser = p.firefox.launch() context = browser.new_context() #page = browser.new_page() if login ==1 or login == 0: #page = context.new_page() #url ="https://hlag.lightning.force.com/lightning/o/Event/home" url = 'https://solutions.hapag-lloyd.com/quick-quotes/#/' url ='https://www.hapag-lloyd.cn/en/login.html' url = "https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en" if savelogin == 1: if login == 1: page = context.new_page() page.goto(url) else: page.goto("https://www.baidu.com") page.wait_for_load_state('networkidle') if login == 0: context = browser.new_context(storage_state = 'hlagqq.json') page = context.new_page() #page.on('response', on_response) try: page.goto(url) except: pass #page.wait_for_load_state('networkidle') #logid =1 #logid = easygui.ynbox("是否不用登陆","",('是','否')) #自动登陆等待 #page.pause() if savelogin == 1: #保存登陆状况cookie context.storage_state(path ='hlagqq.json') savelogin = 0 if savelogin == 1: savelogin = 0 context.storage_state(path ='hlagqq.json') print(context.storage_state) """ if logid == 2: #如果没进入重新进入系统 context = browser.new_context(storage_state = 'hlagsf.json') page = context.new_page() page.goto(url) time.sleep(30) logid = easygui.ynbox("是否保留环境","",('是','否')) #自动登陆等待 if logid == 1: context.storage_state(path ='hlagqqnew.json') savelogin = 0 page.get_by_role("button", name="Log in").click() page.frame_locator("#azureLoginForm").get_by_placeholder("E-mail Address").click() page.frame_locator("#azureLoginForm").get_by_placeholder("E-mail Address").fill("2035055632@qq.com") page.frame_locator("#azureLoginForm").get_by_placeholder("E-mail Address").press("Tab") page.frame_locator("#azureLoginForm").get_by_placeholder("Password").fill("Wujue000wt@") page.frame_locator("#azureLoginForm").get_by_role("button", name="Log in").click() """ try: page.goto(url) time.sleep(10) kk=0 while kk ==0: try: if page.get_by_role("button", name="Select All").count()>0: page.get_by_role("button", name="Select All").click() else: pass except: pass try: page.get_by_role("link", name="Log in here").click() except: pass try: kk =page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count() if kk >0: kk =1 time.sleep(1) else: kk = 0 time.sleep(5) except: kk = 0 time.sleep(10) page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(20) bb = 0 while bb ==0: bb= page.get_by_placeholder("Start Location").count() time.sleep(2) if bb >0: kk = 1 else: kk = 0 bb = 0 time.sleep(5) try: if page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").count()>0: page.frame_locator("#azureLoginForm").get_by_role("link", name="Log in here").click() time.sleep(10) except: pass except: pass try: page.goto("https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en") time.sleep(5) kk=0 while kk == 0: try: kk = page.get_by_placeholder("Start Location").count() time.sleep(2) except: time.sleep(5) kk =0 except: pass QQS_search(page) logid = easygui.enterbox("是否输入起始日期,日期按日.月.年格式 比如24.05.2023") pol =['CNNGB','CNNSA','CNTAO'] polfull = ['NINGBO (CNNGB)','NANSHA (CNNSA)','QINGDAO (CNTAO)'] k=0 for recpol in pol: try: page.goto("https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en") time.sleep(3) kk=0 while kk == 0: try: kk = page.get_by_placeholder("Start Location").count() time.sleep(1) except: time.sleep(5) kk =0 except: pass #page.pause() time.sleep(1) podfull =["LUANDA (AOLAD)","MATADI (CDMAT)","DOUALA (CMDLA)","KRIBI (CMKBI)","DURBAN (ZADUR)","TEMA (GHTEM)","DAKAR (SNDKR)","CONAKRY (GNCKY)","MOMBASA (KEMBA)","DAR-ES-SALAAM (TZDAR)"] pod = ["AOLAD","CDMAT","CMDLA","CMKBI","ZADUR","GHTEM","SNDKR","GNCKY","KEMBA","TZDAR"] j=0 for pd in pod: page.get_by_placeholder("Start Location").click() page.get_by_placeholder("Start Location").fill(recpol) page.get_by_placeholder("Start Location").press("ArrowDown") time.sleep(3) page.get_by_placeholder("Start Location").press("ArrowDown") page.get_by_placeholder("Start Location").press("ArrowDown") try: page.get_by_text(polfull[k]).click() except: time.sleep(20) page.get_by_placeholder("Start Location").press("ArrowDown") page.get_by_text(polfull[k]).click() page.get_by_placeholder("Destination Location").click() page.get_by_placeholder("Destination Location").fill(pd) page.get_by_placeholder("Destination Location").press("ArrowDown") time.sleep(3) page.get_by_placeholder("Destination Location").press("ArrowDown") page.get_by_placeholder("Destination Location").press("ArrowDown") try: page.get_by_text(podfull[j]).click() except: time.sleep(20) page.get_by_placeholder("Destination Location").press("ArrowDown") page.get_by_text(podfull[j]).click() #if j>0: # break j=j+1 if j < len(pod): page.get_by_role("button", name="Additional Routing").click() if len(logid) > 4: page.get_by_placeholder("Select validity date").fill(logid) page.locator("svg.q-checkbox__svg").click() try: page.get_by_role("button", name="Request Quote").click() time.sleep(60) print(page.locator('.quotation-header__number > div:nth-child(1) > div:nth-child(2)').inner_text()) print(page.get_by_label("Email Address").count(),'email address amount') try: page.get_by_label("Email Address").fill("patrickwu008@qq.com") time.sleep(3) except: pass page.locator('div.q-notifications:nth-child(1) > div:nth-child(5)') page.get_by_role("button", name="Request Quote").click() #page.get_by_role("button", name="Request Quote").click() except: pass if page.locator('.quotation-header__number > div:nth-child(1) > div:nth-child(1)').count() == 0: try: page.get_by_role("button", name="Request Quote").click() time.sleep(60) except: pass #print(page.locator('.quotation-header__number > div:nth-child(1) > div:nth-child(2)').content()) try: page.locator("button.bg-orange:nth-child(1)").first.click() except: pass k=k+1 try: #page.locator(".quotation-header__buttons > button:nth-child(1)").first.click() page.locator("button.bg-orange:nth-child(1)").first.click() pass except: pass easygui.textbox(msg='完成') time.sleep(100000) try: page.click('#idp_hint') page.wait_for_load_state('networkidle') except: pass #page.locator("text=登录方式 HLAG Azure").click() #page.wait_for_load_state('networkidle') #page.goto("https://hlag.lightning.force.com/one/one.app") #kk = page.get_attribute('a.subject-link','class') #time.sleep(20) logid = easygui.ynbox("是否日历导入正常","",('是','否')) #自动登陆等待 try: page.locator("span:has-text(\"Calendar\")").first.click() #page.goto("https://hlag.lightning.force.com/lightning/o/Event/home") page.wait_for_load_state('networkidle') except: pass time.sleep(5) #page.on('response', on_response) logid = easygui.choicebox("请选择功能", "功能选择",["做会议准备","做销售规划","单个手动刷会议准备","等待"]) if logid == "做会议准备": salecall_detail_data(page) elif logid == "做销售规划": salescall_plan(page) #做销售规划 elif logid == "单个手动刷会议准备": for i in range(0,20): x= easygui.ynbox("等待进入salecall detail","",('是','否')) #自动登陆等待 salescall_detail(page,i) # 手动单个做开会记录 easygui.textbox(msg='完成') else: time.sleep(10000) time.sleep(10000) def webload(): with sync_playwright() as p: browser = p.chromium.launch(headless=False) #browser = p.webkit.launch(headless=False) #browser = p.firefox.launch(headless=False) #browser = p.webkit.launch() #browser = p.chromium.launch() #browser = p.firefox.launch() context = browser.new_context() page = context.new_page() page.on('response', on_response) url ='https://spa6.scrape.center/' page.goto(url) href = page.get_attribute('a.name', 'href') #print(href) elements = page.query_selector_all('a.name') for element in elements: print(element.get_attribute('href')) print(element.text_content()) time.sleep(10000) # 获取现在时间 now_time = datetime.datetime.now() # 获取明天时间 next_time = now_time + datetime.timedelta(days=+0) #print(next_time) next_year = next_time.date().year next_month = next_time.date().month next_day = next_time.date().day # 获取明天3点时间 next_time = datetime.datetime.strptime(str(next_year)+"-"+str(next_month)+"-"+str(next_day)+" 08:00:00", "%Y-%m-%d %H:%M:%S") next_time2 = datetime.datetime.strptime(str(next_year)+"-"+str(next_month)+"-"+str(next_day)+" 12:00:00", "%Y-%m-%d %H:%M:%S") next_time3 = datetime.datetime.strptime(str(next_year)+"-"+str(next_month)+"-"+str(next_day)+" 16:00:00", "%Y-%m-%d %H:%M:%S") ## 获取昨天时间 # last_time = now_time + datetime.timedelta(days=-1) # 获取距离明天9点时间,单位为秒 timer_start_time = (next_time - now_time).total_seconds() timer_start_time2 = (next_time2 - now_time).total_seconds() timer_start_time3 = (next_time3 - now_time).total_seconds() #print(timer_start_time,timer_start_time2,timer_start_time3) # 54186.75975 # playwright codegen -b firefox --load-storage=D:\hlagqq.json https://www.hapag-lloyd.cn/solutions/quick-quotes-spot/#/ scheduler = sched.scheduler(time.time, time.sleep) shorttime =[" 01:00:00"," 03:00:00"," 06:00:00"," 07:00:00"," 08:00:00"," 08:05:00"," 10:00:00"," 11:00:00"," 12:00:00"," 14:00:00"," 15:00:00"," 16:00:00"," 18:00:00"," 19:00:00"," 20:00:00"," 20:05:00"," 22:00:00"," 23:00:00"] longtime = [" 01:00:00"," 03:00:00"," 08:05:00"," 12:00:00"," 16:00:00"," 20:05:00"] pfr = [] ptime =[":10:00",":19:00",":28:00",":39:00",":47:00",":56:00"] ptime =[":15:00",":30:00",":45:00",":56:00"] for i in range(0,24): for pt in ptime: if i <10: j=" 0"+str(i) else: j=" "+str(i) pfr.append(j+pt) #print(pfr) #time.sleep(10000) def pfr_pr(page,browser,xtime): #slot " 21:36:00" now_time = datetime.datetime.now() ntime = now_time + datetime.timedelta(days=2) #print(ntime) ntime=str(ntime) ntime = ntime[0:19] atime = str(now_time) atime = atime[0:19] #print(ntime) itm = time.mktime(time.strptime(str(ntime), "%Y-%m-%d %H:%M:%S")) atime = time.mktime(time.strptime(str(atime), "%Y-%m-%d %H:%M:%S")) itm = int(itm) atime = int(atime) #print(itm,atime) scheduler.enterabs(int(itm),1,pfr_pr,argument =(page,browser,itm,)) if xtime < atime: return page try: if page.get_by_placeholder("Start Location").count() == 0: re_loginout(page) except: pass sessionid = "" iqqs = {'pol':'CNNSA', 'polfull':'NANSHA (CNNSA)', 'pod': "AOLAD",'podfull':"LUANDA (AOLAD)",'size':"40' General Purpose High Cube"} #sessionid = icookie(page) #Webid_login(page,browser) QQS_item_new(page,iqqs,browser) try: if page.get_by_role("link", name="Edit request").count() >0: page.get_by_role("link", name="Edit request").click() else: pass except: time.sleep(10) if page.get_by_role("link", name="Edit request").count() >0: try: page.get_by_role("link", name="Edit request").click() except: pass else: pass cookies = page.context.cookies() #print("~",cookies) print("~",datetime.datetime.now(),sessionid) try: page.reload() cfmall_key() except: print("---error",datetime.datetime.now(),sessionid) pass #scheduler.run() #scheduler.enterabs(time.mktime(time.strptime(str(ntime), "%Y-%m-%d %H:%M:%S")),1,pfr_pr,argument =(page,browser,slot)) try: context.storage_state(path ='hlagqq.json') except: pass #### cookie never expire return page ''' Created on 2018-4-20 例子:每天凌晨3点执行func方法 ''' def runshort(page,itime,key,slot,context,browser,xtime): now_time = datetime.datetime.now() atime = str(now_time) atime = atime[0:19] atime = time.mktime(time.strptime(str(atime), "%Y-%m-%d %H:%M:%S")) atime = int(atime) #print(itm,atime) itime = timeW(slot,2) ntime = int(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S"))) scheduler.enterabs(ntime,1,runshort,argument =(page,itime,key,slot,context,browser,ntime,)) if xtime+300 < atime: return page sessionid ="" itime=str(itime) #print("do things ",itime,key,time.localtime()) print("快捷",key,datetime.datetime.now(),itime) if key == "L": page = QQS_short_check(page,checkportL,browser) #icookie(page) cookies = page.context.cookies() #### cookie never expire try: context.storage_state(path ='hlagqq.json') except: pass else: page = QQS_short_check(page,checkport,browser) #icookie(page) cookies = page.context.cookies() #### cookie never expire try: context.storage_state(path ='hlagqq.json') except: pass print("快捷OK",datetime.datetime.now(),sessionid) #scheduler.run() try: page.reload() cfmall_key() except: pass """ #tomorrow ntime = timeW(slot,1) now_time = datetime.datetime.now() timediff = (ntime - now_time).total_seconds() scheduler.enterabs(time.mktime(time.strptime(str(ntime), "%Y-%m-%d %H:%M:%S")),1,runshort,argument =(page,ntime,key,slot,context,browser,)) scheduler.run() """ def func_short(page,itime,key): #(next_time - now_time).total_seconds() #print(now_time,itime) #print("快捷",datetime.datetime.now(),itime) if itime > 20000 or itime <0: pass else: if key == "L": #SFLOAD_QQS_1port(page) QQS_short_check(page,checkportL) #print("do L") else: QQS_short_check(page,checkport) #print("do Short") print("快捷OK",datetime.datetime.now()) if itime < 386400: itime = itime+86400 timer = threading.Timer(86400, func_short(page,itime,key)) timer.start() #time.sleep(itime-100) #return page #print("快捷",datetime.datetime.now()) #time.sleep(30) #如果需要循环调用,就要添加以下方法 #timer = threading.Timer(86400,func_short()) #timer.start() #print("快捷OK",datetime.datetime.now()) def func(page,itime): print("干活",datetime.datetime.now()) #SFLOAD_QQS(page) print("干活OK",datetime.datetime.now()) #如果需要循环调用,就要添加以下方法 item = itime + 86400 timer = threading.Timer(86400, func(page,itime)) timer.start() def timeW(slot,n): now_time = datetime.datetime.now() ntime = now_time + datetime.timedelta(days=n) iyear = ntime.date().year imonth = ntime.date().month iday = ntime.date().day #print(ntime,"raw",iday) itm = datetime.datetime.strptime(str(iyear)+"-"+str(imonth)+"-"+str(iday)+slot, "%Y-%m-%d %H:%M:%S") #print(itm) return itm # page 全局设定 / webpage #playwright codegen --load-storage=hlagqq.json www.hapag-lloyd.cn login = 0 # 0不登录会自动登陆 1 要登陆修改cookie 2 已登陆 一般选0 除非cookie失效那1. savelogin = 0 with sync_playwright() as p: #browser = p.chromium.launch(headless=False) #browser = p.webkit.launch(headless=False) browser = p.firefox.launch(headless=False) #browser = p.webkit.launch() #browser = p.chromium.launch() #browser = p.firefox.launch() context = browser.new_context() #page = browser.new_page() if login ==1 or login == 0: url = 'https://solutions.hapag-lloyd.com/quick-quotes/#/' url ='https://www.hapag-lloyd.cn/en/login.html' url = "https://www.hapag-lloyd.cn/solutions/quick-quotes/#/?language=en" url = "https://www.hapag-lloyd.cn/solutions/quick-quotes-spot/#/" url = qqsweb if savelogin == 1: if login == 1: page = context.new_page() page.goto(url) else: page.goto("https://www.baidu.com") page.wait_for_load_state('networkidle') if login == 0: context = browser.new_context(storage_state = 'hlagqq.json') page = context.new_page() #page.on('response', on_response) try: page.goto(url) except: pass if savelogin == 1: #保存登陆状况cookie context.storage_state(path ='hlagqq.json') savelogin = 0 if savelogin == 1: savelogin = 0 context.storage_state(path ='hlagqq.json') print(context.storage_state) #icookie(page) try: Webid_login(page,browser) re_loginout(page) print("loginok") except: print("error login") pass cookies = context.cookies() #print(cookies) #"start work" #Webid_login(page,browser) #QQS_short_check(page,checkport,browser) #re_loginout(page) #qqs_polpodin(page,{'pol':'CNNSA', 'polfull':'NANSHA (CNNSA)', 'pod': "AOLAD",'podfull':"LUANDA (AOLAD)",'size':"40' General Purpose High Cube"}) #qqs_polpodin(page,{'pol':'CNNSA', 'polfull':'NANSHA (CNNSA)', 'pod': "GHTEM",'podfull':"TEMA (GHTEM)",'size':"40' General Purpose High Cube"}) #webid_pass(page) #print("webid login") #QQS_short_check(page,checkportL,browser) #while : # pass #re_loginout(page) #iqqs = {'pol':'CNNSA', 'polfull':'NANSHA (CNNSA)', 'pod': "AOLAD",'podfull':"LUANDA (AOLAD)",'size':"40' General Purpose High Cube"} #qqs_polpodin(page,iqqs) pfr_pr(page,browser,1719039063) print(0,scheduler.queue) #print(pg1) # webpage function start/end #webload() #SFLOAD_QQS_1port(page) #SFLOAD_QQS(page) atime = datetime.datetime.now() for slot in shorttime: itime = timeW(slot,0) now_time = datetime.datetime.now() timediff = (itime - now_time).total_seconds() timediff = int(timediff) xtime = int(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S"))) if slot in longtime: ikey = "L" else: ikey = "S" if timediff >0: print(ikey,itime,"later",timediff) #scheduler.enterabs(xtime,1,runshort,argument =(page,itime,ikey,slot,context,browser,xtime,)) else: itime = timeW(slot,1) xtime = int(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S"))) print(ikey,itime,"tomorrow",timediff) #scheduler.enterabs(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S")),1,runshort,argument =(page,itime,ikey,slot,context,browser,)) for j in range(0,2): itime = timeW(slot,j) xtime = int(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S"))) #print(ikey,itime,"tomorrow",timediff) scheduler.enterabs(xtime,1,runshort,argument =(page,itime,ikey,slot,context,browser,xtime,)) for slot in pfr: #print(slot) itime = timeW(slot,0) now_time = datetime.datetime.now() timediff = (itime - now_time).total_seconds() xtime = int(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S"))) if timediff >0: #scheduler.enterabs(xtime,1,pfr_pr,argument =(page,browser,xtime,)) pass else: itime = timeW(slot,1) xtime = int(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S"))) #scheduler.enterabs(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S")),1,pfr_pr,argument =(page,browser,)) for j in range(0,2): itime = timeW(slot,j) #plan for 2 days later xtime = int(time.mktime(time.strptime(str(itime), "%Y-%m-%d %H:%M:%S"))) scheduler.enterabs(xtime,1,pfr_pr,argument =(page,browser,xtime,)) while 1: print(len(scheduler.queue)) #print(scheduler.queue) scheduler.run() #time.sleep(1000000) """ #定时器,参数为(多少时间后执行,单位为秒,执行的方法) if timer_start_time >0: pass else: timer_start_time=timer_start_time+86400 timer = threading.Timer(timer_start_time, func) timer.start() if timer_start_time2 >0: pass else: timer_start_time2=timer_start_time2+86400 timer = threading.Timer(timer_start_time2, func) timer.start() if timer_start_time3 >0: pass else: timer_start_time3=timer_start_time3+86400 timer = threading.Timer(timer_start_time3, func) timer.start() """ print("finish") #qqs_whole() time.sleep(100000)