Python
Example
# coding=utf-8
# !/usr/bin/env python
import json
import threading
import time
import requests as rq
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,* /*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate, br"
}
testUrl = 'https://api.myip.la/en?json'
# The core business
def testPost(host, port):
proxies = {
'http': 'http://{}:{}'.format(host, port),
'https': 'http://{}:{}'.format(host, port),
}
res = ""
while True:
try:
res = rq.get(testUrl, proxies=proxies, timeout=5)
# print(res.status_code)
print(res.status_code, "***", res.text)
break
except Exception as e:
print(e)
break
return
class ThreadFactory(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
self.host = host
self.port = port
def run(self):
testPost(self.host, self.port)
# Extract the link to the proxy Return value of json type
tiqu = 'Extract the link'
while 1 == 1:
# Extract 10 at a time and put them into the thread
resp = rq.get(url=tiqu, timeout=5)
try:
if resp.status_code == 200:
dataBean = json.loads(resp.text)
else:
print("failure")
time.sleep(1)
continue
except ValueError:
print("failure")
time.sleep(1)
continue
else:
# Parsing json arrays
print("code=", dataBean["code"])
code = dataBean["code"]
if code == 0:
threads = []
for proxy in dataBean["data"]:
threads.append(ThreadFactory(proxy["ip"], proxy["port"]))
for t in threads: # Open the thread
t.start()
time.sleep(0.01)
for t in threads: # Blocking threads
t.join()
# break
time.sleep(1)
Last updated