This commit is contained in:
dela
2026-01-31 22:53:12 +08:00
commit bc639cf460
30 changed files with 6836 additions and 0 deletions

839
docs/developdoc.md Normal file
View File

@@ -0,0 +1,839 @@
下面是一份「**代理池 + 测试 + 分发(轮询)服务**」的**完整开发文档PgSQL 版本)**。内容包括整体架构、数据库表结构SQL、HTTP API 请求形式与契约、Go 语言预留接口(契约)与关键伪代码、关键流程与安全建议。
> 备注:该系统属于通用网络基础设施能力,建议仅在合法合规场景(自有业务、授权测试、合规爬取、出站路由)使用,并做好鉴权、审计与 SSRF 防护。
---
## 1. 目标与范围
### 1.1 目标能力
1. **导入代理**
* 支持:文本粘贴、上传 txt、上传 csv
* 自动解析多种格式去重入库PgSQL 唯一约束)
2. **测试代理可用性**
* 可指定目标网站URL与测试规则超时、期望状态码、可选关键字
* 支持并发 worker pool
* 测试结果可写回数据库(状态/延迟/计数/分数)
3. **分发服务(轮询接口预留)**
* 调用方通过 HTTP 获取“下一条可用代理”RoundRobin / Random / Weighted 可扩展)
* 支持 lease租约机制推荐便于结果上报与统计
4. **结果上报(可选但强烈推荐)**
* 调用方用完代理后上报 success/fail驱动 score 调整与熔断策略
---
## 2. 总体架构与模块划分
### 2.1 模块
* **API Layer**HTTP 路由与鉴权、参数校验、返回 JSON
* **Importer**:解析 Text / CSV 为 Proxy 列表,输出 invalid 行原因
* **Store (PgSQL)**Upsert、查询、健康度更新、RR 游标、lease 管理
* **Tester**:对代理执行 HTTP 请求测试,返回 TestResult
* **Selector**按策略从可用代理集合中选择一条RoundRobin 为默认)
* (可选)**Cleaner Job**:定期清理过期 lease、可选写测试日志
### 2.2 推荐包结构
```
cmd/server/main.go
internal/api/handlers.go
internal/model/types.go
internal/importer/importer.go
internal/tester/http_tester.go
internal/selector/selector.go
internal/store/pg_store.go
internal/security/validate_url.go
```
---
## 3. 数据模型Go
```go
// internal/model/types.go
type ProxyProtocol string
const (
ProtoHTTP ProxyProtocol = "http"
ProtoHTTPS ProxyProtocol = "https"
ProtoSOCKS5 ProxyProtocol = "socks5"
)
type ProxyStatus string
const (
StatusUnknown ProxyStatus = "unknown"
StatusAlive ProxyStatus = "alive"
StatusDead ProxyStatus = "dead"
)
type Proxy struct {
ID string // uuid
Protocol ProxyProtocol
Host string
Port int
Username string
Password string
Group string
Tags []string
Status ProxyStatus
Score int
LatencyMs int64
LastCheckAt int64 // unix ms 或 time.Time看你序列化偏好
FailCount int
SuccessCount int
Disabled bool
CreatedAt int64
UpdatedAt int64
}
type HealthPatch struct {
Status *ProxyStatus
ScoreDelta int
LatencyMs *int64
CheckedAtMs *int64
FailInc int
SuccessInc int
}
```
---
## 4. PostgreSQL 表结构SQL
> 设计要点:**唯一约束去重**、tags 用 `text[] + GIN`、RR 游标用独立表原子自增、lease 用表可追踪。
### 4.1 枚举类型与主表 `proxies`
```sql
CREATE TYPE proxy_protocol AS ENUM ('http', 'https', 'socks5');
CREATE TYPE proxy_status AS ENUM ('unknown', 'alive', 'dead');
CREATE TABLE proxies (
id uuid PRIMARY KEY,
protocol proxy_protocol NOT NULL,
host text NOT NULL,
port int NOT NULL CHECK (port > 0 AND port < 65536),
username text NOT NULL DEFAULT '',
password text NOT NULL DEFAULT '',
"group" text NOT NULL DEFAULT 'default',
tags text[] NOT NULL DEFAULT ARRAY[]::text[],
status proxy_status NOT NULL DEFAULT 'unknown',
score int NOT NULL DEFAULT 0,
latency_ms bigint NOT NULL DEFAULT 0,
last_check_at timestamptz,
fail_count int NOT NULL DEFAULT 0,
success_count int NOT NULL DEFAULT 0,
disabled boolean NOT NULL DEFAULT false,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
CONSTRAINT uq_proxy UNIQUE (protocol, host, port, username)
);
-- 常用索引
CREATE INDEX idx_proxies_group_status_disabled
ON proxies ("group", status, disabled);
-- tags 查询tags && ARRAY['a','b']
CREATE INDEX idx_proxies_tags_gin
ON proxies USING gin (tags);
-- 可用代理热点查询(可选)
CREATE INDEX idx_proxies_alive_fast
ON proxies ("group", disabled, score DESC, last_check_at DESC)
WHERE status = 'alive';
-- updated_at 自动维护(可选,触发器)
CREATE OR REPLACE FUNCTION touch_updated_at()
RETURNS trigger AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_touch_updated_at
BEFORE UPDATE ON proxies
FOR EACH ROW
EXECUTE FUNCTION touch_updated_at();
```
### 4.2 RR 游标表 `rr_cursors`
```sql
CREATE TABLE rr_cursors (
k text PRIMARY KEY,
v bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL DEFAULT now()
);
```
### 4.3 租约表 `proxy_leases`(推荐)
```sql
CREATE TABLE proxy_leases (
lease_id text PRIMARY KEY,
proxy_id uuid NOT NULL REFERENCES proxies(id),
expire_at timestamptz NOT NULL,
site text NOT NULL DEFAULT '',
"group" text NOT NULL DEFAULT 'default',
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX idx_leases_expire ON proxy_leases(expire_at);
```
### 4.4(可选)测试记录表 `proxy_test_logs`
如果你需要审计/排障(哪个目标站导致失败),建议加:
```sql
CREATE TABLE proxy_test_logs (
id bigserial PRIMARY KEY,
proxy_id uuid NOT NULL REFERENCES proxies(id),
site text NOT NULL,
ok boolean NOT NULL,
latency_ms bigint NOT NULL,
error_text text NOT NULL DEFAULT '',
checked_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX idx_test_logs_proxy_time ON proxy_test_logs(proxy_id, checked_at DESC);
```
---
## 5. HTTP API 设计(请求形式 + 契约)
### 通用约定
* BasePath`/v1`
* Content-Type`application/json`(文件上传除外)
* 鉴权:建议 `Authorization: Bearer <token>``X-API-Key: <key>`
* 错误返回统一:
```json
{"error":"bad_request","message":"..."}
```
---
### 5.1 导入(文本粘贴)
`POST /v1/proxies/import/text`
**Request**
```json
{
"group": "default",
"tags": ["batch-2026-01"],
"protocol_hint": "auto",
"text": "http://user:pass@1.2.3.4:8080\nsocks5://5.6.7.8:1080\n9.9.9.9:3128"
}
```
**Response**
```json
{
"imported": 120,
"duplicated": 30,
"invalid": 5,
"invalid_items": [
{"raw":"bad_line","reason":"parse_failed"}
]
}
```
支持解析的常见行格式(建议实现):
* `host:port`
* `user:pass@host:port`
* `http://user:pass@host:port`
* `socks5://host:port`
---
### 5.2 导入(文件上传 txt/csv
`POST /v1/proxies/import/file`multipart/form-data
**Form fields**
* `group`: string
* `tags`: 逗号分隔(或 tags 可重复)
* `type`: `auto|txt|csv`
* `file`: 上传文件
**Response** 同 5.1
CSV 建议列名:
* `protocol,host,port,username,password,group,tags`
* tags 用 `;` 分隔(例:`a;b;c`
---
### 5.3 测试代理
`POST /v1/proxies/test`
**Request**
```json
{
"group": "default",
"filter": {
"status": ["unknown", "alive"],
"tags_any": ["batch-2026-01"],
"limit": 200
},
"test_spec": {
"url": "https://example.com/",
"method": "GET",
"timeout_ms": 5000,
"expect_status": [200, 301, 302],
"expect_contains": ""
},
"concurrency": 50,
"update_store": true,
"write_log": false
}
```
**Response**
```json
{
"summary": {"tested":200,"alive":120,"dead":80},
"results": [
{"proxy_id":"...","ok":true,"latency_ms":340,"error":""},
{"proxy_id":"...","ok":false,"latency_ms":5000,"error":"timeout"}
]
}
```
---
### 5.4 获取下一条可用代理(轮询)
`GET /v1/proxies/next?group=default&site=https%3A%2F%2Fexample.com&policy=round_robin&tags_any=batch-2026-01`
**Response**
```json
{
"proxy": {
"id": "uuid",
"protocol": "http",
"host": "1.2.3.4",
"port": 8080,
"username": "user",
"password": "pass"
},
"lease_id": "lease_01H...",
"ttl_ms": 60000
}
```
> 是否返回 password建议做配置开关例如 `RETURN_SECRET=true/false`。
---
### 5.5 上报使用结果
`POST /v1/proxies/report`
**Request**
```json
{
"lease_id": "lease_01H...",
"proxy_id": "uuid",
"success": false,
"error": "403",
"latency_ms": 1200
}
```
**Response**
```json
{"ok": true}
```
---
## 6. Go 预留接口(名称与契约)
### 6.1 StorePgSQL 实现)
```go
type ProxyQuery struct {
Group string
TagsAny []string
StatusIn []ProxyStatus
OnlyEnabled bool
Limit int
}
type InvalidLine struct {
Raw string
Reason string
}
type ProxyStore interface {
// 导入:批量 upsert + 去重统计
UpsertMany(ctx context.Context, proxies []Proxy) (imported, duplicated int, invalid []InvalidLine, err error)
// 查询
List(ctx context.Context, q ProxyQuery) ([]Proxy, error)
GetByID(ctx context.Context, id string) (*Proxy, error)
// 写回健康度(测试/上报)
UpdateHealth(ctx context.Context, proxyID string, patch HealthPatch) error
// Round-robin 原子游标:返回 [0, modulo) 的索引
NextIndex(ctx context.Context, key string, modulo int) (int, error)
// lease推荐
CreateLease(ctx context.Context, lease Lease) error
GetLease(ctx context.Context, leaseID string) (*Lease, error)
DeleteExpiredLeases(ctx context.Context) (int64, error)
// 可选:写测试日志
InsertTestLog(ctx context.Context, r TestResult, site string) error
}
```
### 6.2 Importer
```go
type ImportInput struct {
Group string
Tags []string
ProtocolHint string // "auto"|"http"|"socks5"
}
type ProxyImporter interface {
ParseText(ctx context.Context, in ImportInput, text string) (proxies []Proxy, invalid []InvalidLine)
ParseCSV(ctx context.Context, in ImportInput, r io.Reader) (proxies []Proxy, invalid []InvalidLine)
}
```
### 6.3 Tester
```go
type TestSpec struct {
URL string
Method string
Timeout time.Duration
ExpectStatus []int
ExpectContains string
}
type TestResult struct {
ProxyID string
OK bool
LatencyMs int64
ErrorText string
CheckedAt time.Time
}
type ProxyTester interface {
TestOne(ctx context.Context, p Proxy, spec TestSpec) TestResult
TestBatch(ctx context.Context, proxies []Proxy, spec TestSpec, concurrency int) []TestResult
}
```
### 6.4 Selector分发策略
```go
type SelectRequest struct {
Group string
Site string
Policy string // "round_robin"|"random"|"weighted"
TagsAny []string
}
type Lease struct {
LeaseID string
Proxy Proxy
ExpireAt time.Time
Group string
Site string
}
type ProxySelector interface {
Next(ctx context.Context, req SelectRequest) (*Lease, error)
Report(ctx context.Context, leaseID, proxyID string, success bool, latencyMs int64, errText string) error
}
```
---
## 7. PgSQL 关键实现伪代码Store
> 假设使用 `pgx` / `database/sql` 都可。这里用伪代码表达 SQL 要点。
### 7.1 批量 UpsertMany导入去重统计
**核心 SQL单条示意**:用 `(xmax = 0)` 判断是否插入PG 技巧)
```sql
INSERT INTO proxies (id, protocol, host, port, username, password, "group", tags)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
ON CONFLICT (protocol, host, port, username)
DO UPDATE SET
password = EXCLUDED.password,
"group" = EXCLUDED."group",
tags = (
SELECT ARRAY(
SELECT DISTINCT unnest(proxies.tags || EXCLUDED.tags)
)
)
RETURNING (xmax = 0) AS inserted, id;
```
**Go 伪代码**
```go
func (s *PgStore) UpsertMany(ctx context.Context, proxies []Proxy) (int, int, []InvalidLine, error) {
// proxies 参数应已由 importer 过滤掉明显非法的 host/port
imported := 0
duplicated := 0
tx := s.db.BeginTx(ctx)
defer tx.Rollback()
for _, p := range proxies {
// 执行上面的 INSERT...RETURNING
inserted := tx.QueryRow(ctx, sqlUpsert, p.ID, p.Protocol, p.Host, p.Port, p.Username, p.Password, p.Group, p.Tags).ScanBool()
if inserted { imported++ } else { duplicated++ }
}
if err := tx.Commit(); err != nil { return 0,0,nil,err }
return imported, duplicated, nil, nil
}
```
---
### 7.2 List查询可用代理
```go
func (s *PgStore) List(ctx context.Context, q ProxyQuery) ([]Proxy, error) {
// SQL 要点:
// group = $1
// disabled=falseOnlyEnabled
// status IN (...)
// tagsAny => tags && $tags
// order by score desc, last_check_at desc
rows := s.db.Query(ctx, sqlList, q.Group, q.StatusIn, q.TagsAny, q.Limit)
return ScanProxies(rows), nil
}
```
---
### 7.3 UpdateHealth写回健康度
建议规则(你可调整):
* success`score += 1``success_count += 1``status = alive`,更新延迟与 last_check_at
* fail`score -= 3``fail_count += 1``status = dead`(或 fail_count 超阈值才 dead
* score 下限可做 clamp避免无限负
```go
func (s *PgStore) UpdateHealth(ctx context.Context, proxyID string, patch HealthPatch) error {
// 伪 SQL按 patch 拼接更新字段(或固定一套更新逻辑)
// UPDATE proxies SET
// status = COALESCE($status, status),
// score = score + $delta,
// latency_ms = COALESCE($latency, latency_ms),
// last_check_at = COALESCE($checked_at, last_check_at),
// fail_count = fail_count + $failInc,
// success_count = success_count + $successInc
// WHERE id=$id;
return nil
}
```
---
### 7.4 RR 游标 NextIndexPgSQL 原子自增)
```sql
INSERT INTO rr_cursors (k, v)
VALUES ($1, 0)
ON CONFLICT (k)
DO UPDATE SET v = rr_cursors.v + 1, updated_at = now()
RETURNING v;
```
```go
func (s *PgStore) NextIndex(ctx context.Context, key string, modulo int) (int, error) {
if modulo <= 0 { return 0, ErrBadModulo }
v := s.db.QueryRow(ctx, sqlCursor, key).ScanInt64()
idx := int(v % int64(modulo))
if idx < 0 { idx = -idx }
return idx, nil
}
```
---
### 7.5 Lease推荐
```go
func (s *PgStore) CreateLease(ctx context.Context, lease Lease) error {
// INSERT INTO proxy_leases(lease_id, proxy_id, expire_at, site, group) VALUES ...
return nil
}
func (s *PgStore) GetLease(ctx context.Context, leaseID string) (*Lease, error) {
// SELECT ... FROM proxy_leases WHERE lease_id=$1 AND expire_at > now()
return nil, nil
}
func (s *PgStore) DeleteExpiredLeases(ctx context.Context) (int64, error) {
// DELETE FROM proxy_leases WHERE expire_at <= now()
return 0, nil
}
```
---
## 8. Importer 与 Tester 关键伪代码
### 8.1 Importer解析文本
```go
func (im *Importer) ParseText(ctx context.Context, in ImportInput, text string) ([]Proxy, []InvalidLine) {
lines := SplitLines(text)
var out []Proxy
var bad []InvalidLine
for _, raw := range lines {
raw = strings.TrimSpace(raw)
if raw == "" { continue }
p, err := ParseProxyLine(raw, in.ProtocolHint)
if err != nil {
bad = append(bad, InvalidLine{Raw: raw, Reason: "parse_failed"})
continue
}
p.Group = Coalesce(p.Group, in.Group)
p.Tags = MergeTags(p.Tags, in.Tags)
p.ID = NewUUID()
out = append(out, p)
}
// 可选:应用内再做一次去重(减少 DB 压力)
out = Dedup(out)
return out, bad
}
```
### 8.2 Tester并发测试worker pool
```go
func (t *HTTPTester) TestBatch(ctx context.Context, proxies []Proxy, spec TestSpec, concurrency int) []TestResult {
jobs := make(chan Proxy)
results := make(chan TestResult)
for i := 0; i < concurrency; i++ {
go func() {
for p := range jobs {
results <- t.TestOne(ctx, p, spec)
}
}()
}
go func() {
defer close(jobs)
for _, p := range proxies { jobs <- p }
}()
out := make([]TestResult, 0, len(proxies))
for i := 0; i < len(proxies); i++ {
out = append(out, <-results)
}
return out
}
```
(实现细节提醒)
* HTTP 代理:`http.Transport{ Proxy: http.ProxyURL(proxyURL) }`
* SOCKS5可用 `golang.org/x/net/proxy` 创建 dialer再塞给 Transport 的 DialContext实现时注意超时与连接复用
---
## 9. Selector轮询分发关键伪代码
```go
func (s *Selector) Next(ctx context.Context, req SelectRequest) (*Lease, error) {
policy := Coalesce(req.Policy, "round_robin")
proxies, err := s.store.List(ctx, ProxyQuery{
Group: req.Group,
TagsAny: req.TagsAny,
StatusIn: []ProxyStatus{StatusAlive},
OnlyEnabled: true,
Limit: 5000,
})
if err != nil { return nil, err }
if len(proxies) == 0 { return nil, ErrNoProxy }
var chosen Proxy
switch policy {
case "round_robin":
key := "rr:" + req.Group + ":" + NormalizeSite(req.Site)
idx, err := s.store.NextIndex(ctx, key, len(proxies))
if err != nil { return nil, err }
chosen = proxies[idx]
case "random":
chosen = proxies[rand.Intn(len(proxies))]
case "weighted":
chosen = WeightedPickByScore(proxies)
default:
return nil, ErrBadPolicy
}
// lease
lease := Lease{
LeaseID: NewLeaseID(),
Proxy: chosen,
Group: req.Group,
Site: req.Site,
ExpireAt: time.Now().Add(60 * time.Second),
}
_ = s.store.CreateLease(ctx, lease) // 推荐:失败也可降级不存(看你需要)
return &lease, nil
}
func (s *Selector) Report(ctx context.Context, leaseID, proxyID string, success bool, latencyMs int64, errText string) error {
// 可选:校验 lease 存在且未过期
// l, _ := s.store.GetLease(ctx, leaseID)
nowMs := time.Now().UnixMilli()
if success {
st := StatusAlive
return s.store.UpdateHealth(ctx, proxyID, HealthPatch{
Status: &st, ScoreDelta: +1, SuccessInc: 1,
LatencyMs: &latencyMs, CheckedAtMs: &nowMs,
})
}
st := StatusDead
return s.store.UpdateHealth(ctx, proxyID, HealthPatch{
Status: &st, ScoreDelta: -3, FailInc: 1,
CheckedAtMs: &nowMs,
})
}
```
---
## 10. HandlerAPI 层)伪代码示例
### 10.1 `/v1/proxies/next`
```go
func HandleNext(sel ProxySelector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
req := SelectRequest{
Group: r.URL.Query().Get("group"),
Site: r.URL.Query().Get("site"),
Policy: r.URL.Query().Get("policy"),
TagsAny: SplitCSV(r.URL.Query().Get("tags_any")),
}
if req.Group == "" { req.Group = "default" }
lease, err := sel.Next(r.Context(), req)
if err != nil { WriteErr(w, 404, err); return }
WriteJSON(w, 200, map[string]any{
"proxy": map[string]any{
"id": lease.Proxy.ID,
"protocol": lease.Proxy.Protocol,
"host": lease.Proxy.Host,
"port": lease.Proxy.Port,
"username": lease.Proxy.Username,
"password": lease.Proxy.Password, // 建议可配置是否返回
},
"lease_id": lease.LeaseID,
"ttl_ms": int64(time.Until(lease.ExpireAt) / time.Millisecond),
})
}
}
```
---
## 11. 安全与稳定性建议(强烈建议你落地)
### 11.1 SSRF 防护(测试目标 URL
`/proxies/test` 允许指定 URL必须做
* 仅允许 `http/https`
* 禁止解析到内网/私网 IP10/8、172.16/12、192.168/16、127/8、169.254/16、::1、fc00::/7 等)
* 可选:目标域名 allowlist最稳妥
* 限制端口80/443/自定义)
### 11.2 鉴权与审计
* 所有接口建议加 `API Key``JWT`
* 对导入/测试/分发/上报做访问日志与速率限制(尤其 `/next`
### 11.3 测试与分发的资源控制
* concurrency 上限(例如 <= 200
* 单次测试 limit 上限(例如 <= 2000
* HTTP 超时、TLS 握手超时、最大响应体大小ReadLimited
*`/next` 做简单缓存(例如缓存 alive 列表 5~30 秒)以减轻 DB 压力(后续也方便升级 Redis
---
## 12. 关键流程(端到端)
1. 导入代理
`POST /v1/proxies/import/text``/import/file` → UpsertMany → proxies 表
2. 测试代理
`POST /v1/proxies/test` → List(unknown/alive) → Tester 并发测试 → UpdateHealth可选 InsertTestLog
3. 分发代理
`GET /v1/proxies/next` → List(alive) → NextIndex(rr_key) → chosen → CreateLease → 返回
4. 上报结果
`POST /v1/proxies/report` → UpdateHealth成功加分失败扣分/标 dead
---