Files
ProxyPool/docs/developdoc.md
2026-01-31 22:53:12 +08:00

840 lines
20 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
下面是一份「**代理池 + 测试 + 分发(轮询)服务**」的**完整开发文档PgSQL 版本)**。内容包括整体架构、数据库表结构SQL、HTTP API 请求形式与契约、Go 语言预留接口(契约)与关键伪代码、关键流程与安全建议。
> 备注:该系统属于通用网络基础设施能力,建议仅在合法合规场景(自有业务、授权测试、合规爬取、出站路由)使用,并做好鉴权、审计与 SSRF 防护。
---
## 1. 目标与范围
### 1.1 目标能力
1. **导入代理**
* 支持:文本粘贴、上传 txt、上传 csv
* 自动解析多种格式去重入库PgSQL 唯一约束)
2. **测试代理可用性**
* 可指定目标网站URL与测试规则超时、期望状态码、可选关键字
* 支持并发 worker pool
* 测试结果可写回数据库(状态/延迟/计数/分数)
3. **分发服务(轮询接口预留)**
* 调用方通过 HTTP 获取“下一条可用代理”RoundRobin / Random / Weighted 可扩展)
* 支持 lease租约机制推荐便于结果上报与统计
4. **结果上报(可选但强烈推荐)**
* 调用方用完代理后上报 success/fail驱动 score 调整与熔断策略
---
## 2. 总体架构与模块划分
### 2.1 模块
* **API Layer**HTTP 路由与鉴权、参数校验、返回 JSON
* **Importer**:解析 Text / CSV 为 Proxy 列表,输出 invalid 行原因
* **Store (PgSQL)**Upsert、查询、健康度更新、RR 游标、lease 管理
* **Tester**:对代理执行 HTTP 请求测试,返回 TestResult
* **Selector**按策略从可用代理集合中选择一条RoundRobin 为默认)
* (可选)**Cleaner Job**:定期清理过期 lease、可选写测试日志
### 2.2 推荐包结构
```
cmd/server/main.go
internal/api/handlers.go
internal/model/types.go
internal/importer/importer.go
internal/tester/http_tester.go
internal/selector/selector.go
internal/store/pg_store.go
internal/security/validate_url.go
```
---
## 3. 数据模型Go
```go
// internal/model/types.go
type ProxyProtocol string
const (
ProtoHTTP ProxyProtocol = "http"
ProtoHTTPS ProxyProtocol = "https"
ProtoSOCKS5 ProxyProtocol = "socks5"
)
type ProxyStatus string
const (
StatusUnknown ProxyStatus = "unknown"
StatusAlive ProxyStatus = "alive"
StatusDead ProxyStatus = "dead"
)
type Proxy struct {
ID string // uuid
Protocol ProxyProtocol
Host string
Port int
Username string
Password string
Group string
Tags []string
Status ProxyStatus
Score int
LatencyMs int64
LastCheckAt int64 // unix ms 或 time.Time看你序列化偏好
FailCount int
SuccessCount int
Disabled bool
CreatedAt int64
UpdatedAt int64
}
type HealthPatch struct {
Status *ProxyStatus
ScoreDelta int
LatencyMs *int64
CheckedAtMs *int64
FailInc int
SuccessInc int
}
```
---
## 4. PostgreSQL 表结构SQL
> 设计要点:**唯一约束去重**、tags 用 `text[] + GIN`、RR 游标用独立表原子自增、lease 用表可追踪。
### 4.1 枚举类型与主表 `proxies`
```sql
CREATE TYPE proxy_protocol AS ENUM ('http', 'https', 'socks5');
CREATE TYPE proxy_status AS ENUM ('unknown', 'alive', 'dead');
CREATE TABLE proxies (
id uuid PRIMARY KEY,
protocol proxy_protocol NOT NULL,
host text NOT NULL,
port int NOT NULL CHECK (port > 0 AND port < 65536),
username text NOT NULL DEFAULT '',
password text NOT NULL DEFAULT '',
"group" text NOT NULL DEFAULT 'default',
tags text[] NOT NULL DEFAULT ARRAY[]::text[],
status proxy_status NOT NULL DEFAULT 'unknown',
score int NOT NULL DEFAULT 0,
latency_ms bigint NOT NULL DEFAULT 0,
last_check_at timestamptz,
fail_count int NOT NULL DEFAULT 0,
success_count int NOT NULL DEFAULT 0,
disabled boolean NOT NULL DEFAULT false,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
CONSTRAINT uq_proxy UNIQUE (protocol, host, port, username)
);
-- 常用索引
CREATE INDEX idx_proxies_group_status_disabled
ON proxies ("group", status, disabled);
-- tags 查询tags && ARRAY['a','b']
CREATE INDEX idx_proxies_tags_gin
ON proxies USING gin (tags);
-- 可用代理热点查询(可选)
CREATE INDEX idx_proxies_alive_fast
ON proxies ("group", disabled, score DESC, last_check_at DESC)
WHERE status = 'alive';
-- updated_at 自动维护(可选,触发器)
CREATE OR REPLACE FUNCTION touch_updated_at()
RETURNS trigger AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_touch_updated_at
BEFORE UPDATE ON proxies
FOR EACH ROW
EXECUTE FUNCTION touch_updated_at();
```
### 4.2 RR 游标表 `rr_cursors`
```sql
CREATE TABLE rr_cursors (
k text PRIMARY KEY,
v bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL DEFAULT now()
);
```
### 4.3 租约表 `proxy_leases`(推荐)
```sql
CREATE TABLE proxy_leases (
lease_id text PRIMARY KEY,
proxy_id uuid NOT NULL REFERENCES proxies(id),
expire_at timestamptz NOT NULL,
site text NOT NULL DEFAULT '',
"group" text NOT NULL DEFAULT 'default',
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX idx_leases_expire ON proxy_leases(expire_at);
```
### 4.4(可选)测试记录表 `proxy_test_logs`
如果你需要审计/排障(哪个目标站导致失败),建议加:
```sql
CREATE TABLE proxy_test_logs (
id bigserial PRIMARY KEY,
proxy_id uuid NOT NULL REFERENCES proxies(id),
site text NOT NULL,
ok boolean NOT NULL,
latency_ms bigint NOT NULL,
error_text text NOT NULL DEFAULT '',
checked_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX idx_test_logs_proxy_time ON proxy_test_logs(proxy_id, checked_at DESC);
```
---
## 5. HTTP API 设计(请求形式 + 契约)
### 通用约定
* BasePath`/v1`
* Content-Type`application/json`(文件上传除外)
* 鉴权:建议 `Authorization: Bearer <token>``X-API-Key: <key>`
* 错误返回统一:
```json
{"error":"bad_request","message":"..."}
```
---
### 5.1 导入(文本粘贴)
`POST /v1/proxies/import/text`
**Request**
```json
{
"group": "default",
"tags": ["batch-2026-01"],
"protocol_hint": "auto",
"text": "http://user:pass@1.2.3.4:8080\nsocks5://5.6.7.8:1080\n9.9.9.9:3128"
}
```
**Response**
```json
{
"imported": 120,
"duplicated": 30,
"invalid": 5,
"invalid_items": [
{"raw":"bad_line","reason":"parse_failed"}
]
}
```
支持解析的常见行格式(建议实现):
* `host:port`
* `user:pass@host:port`
* `http://user:pass@host:port`
* `socks5://host:port`
---
### 5.2 导入(文件上传 txt/csv
`POST /v1/proxies/import/file`multipart/form-data
**Form fields**
* `group`: string
* `tags`: 逗号分隔(或 tags 可重复)
* `type`: `auto|txt|csv`
* `file`: 上传文件
**Response** 同 5.1
CSV 建议列名:
* `protocol,host,port,username,password,group,tags`
* tags 用 `;` 分隔(例:`a;b;c`
---
### 5.3 测试代理
`POST /v1/proxies/test`
**Request**
```json
{
"group": "default",
"filter": {
"status": ["unknown", "alive"],
"tags_any": ["batch-2026-01"],
"limit": 200
},
"test_spec": {
"url": "https://example.com/",
"method": "GET",
"timeout_ms": 5000,
"expect_status": [200, 301, 302],
"expect_contains": ""
},
"concurrency": 50,
"update_store": true,
"write_log": false
}
```
**Response**
```json
{
"summary": {"tested":200,"alive":120,"dead":80},
"results": [
{"proxy_id":"...","ok":true,"latency_ms":340,"error":""},
{"proxy_id":"...","ok":false,"latency_ms":5000,"error":"timeout"}
]
}
```
---
### 5.4 获取下一条可用代理(轮询)
`GET /v1/proxies/next?group=default&site=https%3A%2F%2Fexample.com&policy=round_robin&tags_any=batch-2026-01`
**Response**
```json
{
"proxy": {
"id": "uuid",
"protocol": "http",
"host": "1.2.3.4",
"port": 8080,
"username": "user",
"password": "pass"
},
"lease_id": "lease_01H...",
"ttl_ms": 60000
}
```
> 是否返回 password建议做配置开关例如 `RETURN_SECRET=true/false`。
---
### 5.5 上报使用结果
`POST /v1/proxies/report`
**Request**
```json
{
"lease_id": "lease_01H...",
"proxy_id": "uuid",
"success": false,
"error": "403",
"latency_ms": 1200
}
```
**Response**
```json
{"ok": true}
```
---
## 6. Go 预留接口(名称与契约)
### 6.1 StorePgSQL 实现)
```go
type ProxyQuery struct {
Group string
TagsAny []string
StatusIn []ProxyStatus
OnlyEnabled bool
Limit int
}
type InvalidLine struct {
Raw string
Reason string
}
type ProxyStore interface {
// 导入:批量 upsert + 去重统计
UpsertMany(ctx context.Context, proxies []Proxy) (imported, duplicated int, invalid []InvalidLine, err error)
// 查询
List(ctx context.Context, q ProxyQuery) ([]Proxy, error)
GetByID(ctx context.Context, id string) (*Proxy, error)
// 写回健康度(测试/上报)
UpdateHealth(ctx context.Context, proxyID string, patch HealthPatch) error
// Round-robin 原子游标:返回 [0, modulo) 的索引
NextIndex(ctx context.Context, key string, modulo int) (int, error)
// lease推荐
CreateLease(ctx context.Context, lease Lease) error
GetLease(ctx context.Context, leaseID string) (*Lease, error)
DeleteExpiredLeases(ctx context.Context) (int64, error)
// 可选:写测试日志
InsertTestLog(ctx context.Context, r TestResult, site string) error
}
```
### 6.2 Importer
```go
type ImportInput struct {
Group string
Tags []string
ProtocolHint string // "auto"|"http"|"socks5"
}
type ProxyImporter interface {
ParseText(ctx context.Context, in ImportInput, text string) (proxies []Proxy, invalid []InvalidLine)
ParseCSV(ctx context.Context, in ImportInput, r io.Reader) (proxies []Proxy, invalid []InvalidLine)
}
```
### 6.3 Tester
```go
type TestSpec struct {
URL string
Method string
Timeout time.Duration
ExpectStatus []int
ExpectContains string
}
type TestResult struct {
ProxyID string
OK bool
LatencyMs int64
ErrorText string
CheckedAt time.Time
}
type ProxyTester interface {
TestOne(ctx context.Context, p Proxy, spec TestSpec) TestResult
TestBatch(ctx context.Context, proxies []Proxy, spec TestSpec, concurrency int) []TestResult
}
```
### 6.4 Selector分发策略
```go
type SelectRequest struct {
Group string
Site string
Policy string // "round_robin"|"random"|"weighted"
TagsAny []string
}
type Lease struct {
LeaseID string
Proxy Proxy
ExpireAt time.Time
Group string
Site string
}
type ProxySelector interface {
Next(ctx context.Context, req SelectRequest) (*Lease, error)
Report(ctx context.Context, leaseID, proxyID string, success bool, latencyMs int64, errText string) error
}
```
---
## 7. PgSQL 关键实现伪代码Store
> 假设使用 `pgx` / `database/sql` 都可。这里用伪代码表达 SQL 要点。
### 7.1 批量 UpsertMany导入去重统计
**核心 SQL单条示意**:用 `(xmax = 0)` 判断是否插入PG 技巧)
```sql
INSERT INTO proxies (id, protocol, host, port, username, password, "group", tags)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
ON CONFLICT (protocol, host, port, username)
DO UPDATE SET
password = EXCLUDED.password,
"group" = EXCLUDED."group",
tags = (
SELECT ARRAY(
SELECT DISTINCT unnest(proxies.tags || EXCLUDED.tags)
)
)
RETURNING (xmax = 0) AS inserted, id;
```
**Go 伪代码**
```go
func (s *PgStore) UpsertMany(ctx context.Context, proxies []Proxy) (int, int, []InvalidLine, error) {
// proxies 参数应已由 importer 过滤掉明显非法的 host/port
imported := 0
duplicated := 0
tx := s.db.BeginTx(ctx)
defer tx.Rollback()
for _, p := range proxies {
// 执行上面的 INSERT...RETURNING
inserted := tx.QueryRow(ctx, sqlUpsert, p.ID, p.Protocol, p.Host, p.Port, p.Username, p.Password, p.Group, p.Tags).ScanBool()
if inserted { imported++ } else { duplicated++ }
}
if err := tx.Commit(); err != nil { return 0,0,nil,err }
return imported, duplicated, nil, nil
}
```
---
### 7.2 List查询可用代理
```go
func (s *PgStore) List(ctx context.Context, q ProxyQuery) ([]Proxy, error) {
// SQL 要点:
// group = $1
// disabled=falseOnlyEnabled
// status IN (...)
// tagsAny => tags && $tags
// order by score desc, last_check_at desc
rows := s.db.Query(ctx, sqlList, q.Group, q.StatusIn, q.TagsAny, q.Limit)
return ScanProxies(rows), nil
}
```
---
### 7.3 UpdateHealth写回健康度
建议规则(你可调整):
* success`score += 1``success_count += 1``status = alive`,更新延迟与 last_check_at
* fail`score -= 3``fail_count += 1``status = dead`(或 fail_count 超阈值才 dead
* score 下限可做 clamp避免无限负
```go
func (s *PgStore) UpdateHealth(ctx context.Context, proxyID string, patch HealthPatch) error {
// 伪 SQL按 patch 拼接更新字段(或固定一套更新逻辑)
// UPDATE proxies SET
// status = COALESCE($status, status),
// score = score + $delta,
// latency_ms = COALESCE($latency, latency_ms),
// last_check_at = COALESCE($checked_at, last_check_at),
// fail_count = fail_count + $failInc,
// success_count = success_count + $successInc
// WHERE id=$id;
return nil
}
```
---
### 7.4 RR 游标 NextIndexPgSQL 原子自增)
```sql
INSERT INTO rr_cursors (k, v)
VALUES ($1, 0)
ON CONFLICT (k)
DO UPDATE SET v = rr_cursors.v + 1, updated_at = now()
RETURNING v;
```
```go
func (s *PgStore) NextIndex(ctx context.Context, key string, modulo int) (int, error) {
if modulo <= 0 { return 0, ErrBadModulo }
v := s.db.QueryRow(ctx, sqlCursor, key).ScanInt64()
idx := int(v % int64(modulo))
if idx < 0 { idx = -idx }
return idx, nil
}
```
---
### 7.5 Lease推荐
```go
func (s *PgStore) CreateLease(ctx context.Context, lease Lease) error {
// INSERT INTO proxy_leases(lease_id, proxy_id, expire_at, site, group) VALUES ...
return nil
}
func (s *PgStore) GetLease(ctx context.Context, leaseID string) (*Lease, error) {
// SELECT ... FROM proxy_leases WHERE lease_id=$1 AND expire_at > now()
return nil, nil
}
func (s *PgStore) DeleteExpiredLeases(ctx context.Context) (int64, error) {
// DELETE FROM proxy_leases WHERE expire_at <= now()
return 0, nil
}
```
---
## 8. Importer 与 Tester 关键伪代码
### 8.1 Importer解析文本
```go
func (im *Importer) ParseText(ctx context.Context, in ImportInput, text string) ([]Proxy, []InvalidLine) {
lines := SplitLines(text)
var out []Proxy
var bad []InvalidLine
for _, raw := range lines {
raw = strings.TrimSpace(raw)
if raw == "" { continue }
p, err := ParseProxyLine(raw, in.ProtocolHint)
if err != nil {
bad = append(bad, InvalidLine{Raw: raw, Reason: "parse_failed"})
continue
}
p.Group = Coalesce(p.Group, in.Group)
p.Tags = MergeTags(p.Tags, in.Tags)
p.ID = NewUUID()
out = append(out, p)
}
// 可选:应用内再做一次去重(减少 DB 压力)
out = Dedup(out)
return out, bad
}
```
### 8.2 Tester并发测试worker pool
```go
func (t *HTTPTester) TestBatch(ctx context.Context, proxies []Proxy, spec TestSpec, concurrency int) []TestResult {
jobs := make(chan Proxy)
results := make(chan TestResult)
for i := 0; i < concurrency; i++ {
go func() {
for p := range jobs {
results <- t.TestOne(ctx, p, spec)
}
}()
}
go func() {
defer close(jobs)
for _, p := range proxies { jobs <- p }
}()
out := make([]TestResult, 0, len(proxies))
for i := 0; i < len(proxies); i++ {
out = append(out, <-results)
}
return out
}
```
(实现细节提醒)
* HTTP 代理:`http.Transport{ Proxy: http.ProxyURL(proxyURL) }`
* SOCKS5可用 `golang.org/x/net/proxy` 创建 dialer再塞给 Transport 的 DialContext实现时注意超时与连接复用
---
## 9. Selector轮询分发关键伪代码
```go
func (s *Selector) Next(ctx context.Context, req SelectRequest) (*Lease, error) {
policy := Coalesce(req.Policy, "round_robin")
proxies, err := s.store.List(ctx, ProxyQuery{
Group: req.Group,
TagsAny: req.TagsAny,
StatusIn: []ProxyStatus{StatusAlive},
OnlyEnabled: true,
Limit: 5000,
})
if err != nil { return nil, err }
if len(proxies) == 0 { return nil, ErrNoProxy }
var chosen Proxy
switch policy {
case "round_robin":
key := "rr:" + req.Group + ":" + NormalizeSite(req.Site)
idx, err := s.store.NextIndex(ctx, key, len(proxies))
if err != nil { return nil, err }
chosen = proxies[idx]
case "random":
chosen = proxies[rand.Intn(len(proxies))]
case "weighted":
chosen = WeightedPickByScore(proxies)
default:
return nil, ErrBadPolicy
}
// lease
lease := Lease{
LeaseID: NewLeaseID(),
Proxy: chosen,
Group: req.Group,
Site: req.Site,
ExpireAt: time.Now().Add(60 * time.Second),
}
_ = s.store.CreateLease(ctx, lease) // 推荐:失败也可降级不存(看你需要)
return &lease, nil
}
func (s *Selector) Report(ctx context.Context, leaseID, proxyID string, success bool, latencyMs int64, errText string) error {
// 可选:校验 lease 存在且未过期
// l, _ := s.store.GetLease(ctx, leaseID)
nowMs := time.Now().UnixMilli()
if success {
st := StatusAlive
return s.store.UpdateHealth(ctx, proxyID, HealthPatch{
Status: &st, ScoreDelta: +1, SuccessInc: 1,
LatencyMs: &latencyMs, CheckedAtMs: &nowMs,
})
}
st := StatusDead
return s.store.UpdateHealth(ctx, proxyID, HealthPatch{
Status: &st, ScoreDelta: -3, FailInc: 1,
CheckedAtMs: &nowMs,
})
}
```
---
## 10. HandlerAPI 层)伪代码示例
### 10.1 `/v1/proxies/next`
```go
func HandleNext(sel ProxySelector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
req := SelectRequest{
Group: r.URL.Query().Get("group"),
Site: r.URL.Query().Get("site"),
Policy: r.URL.Query().Get("policy"),
TagsAny: SplitCSV(r.URL.Query().Get("tags_any")),
}
if req.Group == "" { req.Group = "default" }
lease, err := sel.Next(r.Context(), req)
if err != nil { WriteErr(w, 404, err); return }
WriteJSON(w, 200, map[string]any{
"proxy": map[string]any{
"id": lease.Proxy.ID,
"protocol": lease.Proxy.Protocol,
"host": lease.Proxy.Host,
"port": lease.Proxy.Port,
"username": lease.Proxy.Username,
"password": lease.Proxy.Password, // 建议可配置是否返回
},
"lease_id": lease.LeaseID,
"ttl_ms": int64(time.Until(lease.ExpireAt) / time.Millisecond),
})
}
}
```
---
## 11. 安全与稳定性建议(强烈建议你落地)
### 11.1 SSRF 防护(测试目标 URL
`/proxies/test` 允许指定 URL必须做
* 仅允许 `http/https`
* 禁止解析到内网/私网 IP10/8、172.16/12、192.168/16、127/8、169.254/16、::1、fc00::/7 等)
* 可选:目标域名 allowlist最稳妥
* 限制端口80/443/自定义)
### 11.2 鉴权与审计
* 所有接口建议加 `API Key``JWT`
* 对导入/测试/分发/上报做访问日志与速率限制(尤其 `/next`
### 11.3 测试与分发的资源控制
* concurrency 上限(例如 <= 200
* 单次测试 limit 上限(例如 <= 2000
* HTTP 超时、TLS 握手超时、最大响应体大小ReadLimited
*`/next` 做简单缓存(例如缓存 alive 列表 5~30 秒)以减轻 DB 压力(后续也方便升级 Redis
---
## 12. 关键流程(端到端)
1. 导入代理
`POST /v1/proxies/import/text``/import/file` → UpsertMany → proxies 表
2. 测试代理
`POST /v1/proxies/test` → List(unknown/alive) → Tester 并发测试 → UpdateHealth可选 InsertTestLog
3. 分发代理
`GET /v1/proxies/next` → List(alive) → NextIndex(rr_key) → chosen → CreateLease → 返回
4. 上报结果
`POST /v1/proxies/report` → UpdateHealth成功加分失败扣分/标 dead
---