1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| #后端代码:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2019/4/15 14:38 # @Author : Ropon # @File : ws_demo.py
from flask import Flask, request, render_template from geventwebsocket.handler import WebSocketHandler from gevent.pywsgi import WSGIServer from geventwebsocket.websocket import WebSocket import subprocess import json
app = Flask(__name__)
@app.route("/ipv6check") def index(): return render_template("ws_demo.html")
# ipv6 = "240e:d9:c200:101:7bb2::120"
user_socket_list = [] user_socket_dict = {}
@app.route("/ws/<ipv6>") def ws(ipv6): user_socket = request.environ.get("wsgi.websocket") # type:WebSocket # 做语法提示 if user_socket: user_socket_dict[ipv6] = user_socket msg = user_socket.receive() msg_dict = json.loads(msg) p = subprocess.Popen("ping -6 -c 4 " + ipv6, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while True: line = p.stdout.readline().rstrip().decode('utf8') # print(line) user_socket_dict[ipv6].send(line) if not line: break if msg_dict.get("curl"): user_socket_dict[ipv6].send("-------------------------------------") p = subprocess.Popen("curl -6 -I " + ipv6, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while True: line = p.stdout.readline().rstrip().decode('utf8') # print(line) user_socket_dict[ipv6].send(line) if not line: break
if msg_dict.get("curl"): user_socket_dict[ipv6].send("-------------------------------------") p = subprocess.Popen("tcping -v6 " + ipv6 + " 80", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while True: line = p.stdout.readline().rstrip().decode('utf8') # print(line) user_socket_dict[ipv6].send(line) if not line: break return "ok"
if __name__ == '__main__': http_serv = WSGIServer(("127.0.0.1", 9527), app, handler_class=WebSocketHandler) http_serv.serve_forever()
#前端代码
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>IPV6检测</title> </head> <body> <input type="text" name="ipv6" id="ipv6" placeholder="请输入ipv6地址或域名,ipv6地址不支持curl" style="width:300px;"> <input type="checkbox" name="opother" id="curl" value="1">curl <input type="checkbox" name="opother" id="tcping" value="2">tcping <button onclick="checkipv6()">检测</button> <div id="ping_list"></div> <script> function checkipv6() { document.getElementById("ping_list").innerHTML = ""; var domainflag = document.domain; var ipv6 = document.getElementById('ipv6').value; var curlflag = document.getElementById('curl').checked; var tcpingflag = document.getElementById('tcping').checked; var ws = new WebSocket("wss://django.ropon.top/ws/" + ipv6); var send_str = { "domain": domainflag, "curl": curlflag, "tcping": tcpingflag }; ws.onopen = function sendopen() { ws.send(JSON.stringify(send_str)); }; ws.onmessage = function (data) { var ptag = document.createElement('p'); ptag.innerText = data.data; document.getElementById('ping_list').appendChild(ptag) }; } </script> </body> </html>
|