下面是一份「**代理池 + 测试 + 分发(轮询)服务**」的**完整开发文档(PgSQL 版本)**。内容包括:整体架构、数据库表结构(SQL)、HTTP API 请求形式与契约、Go 语言预留接口(契约)与关键伪代码、关键流程与安全建议。 > 备注:该系统属于通用网络基础设施能力,建议仅在合法合规场景(自有业务、授权测试、合规爬取、出站路由)使用,并做好鉴权、审计与 SSRF 防护。 --- ## 1. 目标与范围 ### 1.1 目标能力 1. **导入代理** * 支持:文本粘贴、上传 txt、上传 csv * 自动解析多种格式,去重入库(PgSQL 唯一约束) 2. **测试代理可用性** * 可指定目标网站(URL)与测试规则(超时、期望状态码、可选关键字) * 支持并发 worker pool * 测试结果可写回数据库(状态/延迟/计数/分数) 3. **分发服务(轮询接口预留)** * 调用方通过 HTTP 获取“下一条可用代理”(RoundRobin / Random / Weighted 可扩展) * 支持 lease(租约)机制(推荐),便于结果上报与统计 4. **结果上报(可选但强烈推荐)** * 调用方用完代理后上报 success/fail,驱动 score 调整与熔断策略 --- ## 2. 总体架构与模块划分 ### 2.1 模块 * **API Layer**:HTTP 路由与鉴权、参数校验、返回 JSON * **Importer**:解析 Text / CSV 为 Proxy 列表,输出 invalid 行原因 * **Store (PgSQL)**:Upsert、查询、健康度更新、RR 游标、lease 管理 * **Tester**:对代理执行 HTTP 请求测试,返回 TestResult * **Selector**:按策略从可用代理集合中选择一条(RoundRobin 为默认) * (可选)**Cleaner Job**:定期清理过期 lease、(可选)写测试日志 ### 2.2 推荐包结构 ``` cmd/server/main.go internal/api/handlers.go internal/model/types.go internal/importer/importer.go internal/tester/http_tester.go internal/selector/selector.go internal/store/pg_store.go internal/security/validate_url.go ``` --- ## 3. 数据模型(Go) ```go // internal/model/types.go type ProxyProtocol string const ( ProtoHTTP ProxyProtocol = "http" ProtoHTTPS ProxyProtocol = "https" ProtoSOCKS5 ProxyProtocol = "socks5" ) type ProxyStatus string const ( StatusUnknown ProxyStatus = "unknown" StatusAlive ProxyStatus = "alive" StatusDead ProxyStatus = "dead" ) type Proxy struct { ID string // uuid Protocol ProxyProtocol Host string Port int Username string Password string Group string Tags []string Status ProxyStatus Score int LatencyMs int64 LastCheckAt int64 // unix ms 或 time.Time(看你序列化偏好) FailCount int SuccessCount int Disabled bool CreatedAt int64 UpdatedAt int64 } type HealthPatch struct { Status *ProxyStatus ScoreDelta int LatencyMs *int64 CheckedAtMs *int64 FailInc int SuccessInc int } ``` --- ## 4. PostgreSQL 表结构(SQL) > 设计要点:**唯一约束去重**、tags 用 `text[] + GIN`、RR 游标用独立表原子自增、lease 用表可追踪。 ### 4.1 枚举类型与主表 `proxies` ```sql CREATE TYPE proxy_protocol AS ENUM ('http', 'https', 'socks5'); CREATE TYPE proxy_status AS ENUM ('unknown', 'alive', 'dead'); CREATE TABLE proxies ( id uuid PRIMARY KEY, protocol proxy_protocol NOT NULL, host text NOT NULL, port int NOT NULL CHECK (port > 0 AND port < 65536), username text NOT NULL DEFAULT '', password text NOT NULL DEFAULT '', "group" text NOT NULL DEFAULT 'default', tags text[] NOT NULL DEFAULT ARRAY[]::text[], status proxy_status NOT NULL DEFAULT 'unknown', score int NOT NULL DEFAULT 0, latency_ms bigint NOT NULL DEFAULT 0, last_check_at timestamptz, fail_count int NOT NULL DEFAULT 0, success_count int NOT NULL DEFAULT 0, disabled boolean NOT NULL DEFAULT false, created_at timestamptz NOT NULL DEFAULT now(), updated_at timestamptz NOT NULL DEFAULT now(), CONSTRAINT uq_proxy UNIQUE (protocol, host, port, username) ); -- 常用索引 CREATE INDEX idx_proxies_group_status_disabled ON proxies ("group", status, disabled); -- tags 查询:tags && ARRAY['a','b'] CREATE INDEX idx_proxies_tags_gin ON proxies USING gin (tags); -- 可用代理热点查询(可选) CREATE INDEX idx_proxies_alive_fast ON proxies ("group", disabled, score DESC, last_check_at DESC) WHERE status = 'alive'; -- updated_at 自动维护(可选,触发器) CREATE OR REPLACE FUNCTION touch_updated_at() RETURNS trigger AS $$ BEGIN NEW.updated_at = now(); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER trg_touch_updated_at BEFORE UPDATE ON proxies FOR EACH ROW EXECUTE FUNCTION touch_updated_at(); ``` ### 4.2 RR 游标表 `rr_cursors` ```sql CREATE TABLE rr_cursors ( k text PRIMARY KEY, v bigint NOT NULL DEFAULT 0, updated_at timestamptz NOT NULL DEFAULT now() ); ``` ### 4.3 租约表 `proxy_leases`(推荐) ```sql CREATE TABLE proxy_leases ( lease_id text PRIMARY KEY, proxy_id uuid NOT NULL REFERENCES proxies(id), expire_at timestamptz NOT NULL, site text NOT NULL DEFAULT '', "group" text NOT NULL DEFAULT 'default', created_at timestamptz NOT NULL DEFAULT now() ); CREATE INDEX idx_leases_expire ON proxy_leases(expire_at); ``` ### 4.4(可选)测试记录表 `proxy_test_logs` 如果你需要审计/排障(哪个目标站导致失败),建议加: ```sql CREATE TABLE proxy_test_logs ( id bigserial PRIMARY KEY, proxy_id uuid NOT NULL REFERENCES proxies(id), site text NOT NULL, ok boolean NOT NULL, latency_ms bigint NOT NULL, error_text text NOT NULL DEFAULT '', checked_at timestamptz NOT NULL DEFAULT now() ); CREATE INDEX idx_test_logs_proxy_time ON proxy_test_logs(proxy_id, checked_at DESC); ``` --- ## 5. HTTP API 设计(请求形式 + 契约) ### 通用约定 * BasePath:`/v1` * Content-Type:`application/json`(文件上传除外) * 鉴权:建议 `Authorization: Bearer ` 或 `X-API-Key: ` * 错误返回统一: ```json {"error":"bad_request","message":"..."} ``` --- ### 5.1 导入(文本粘贴) `POST /v1/proxies/import/text` **Request** ```json { "group": "default", "tags": ["batch-2026-01"], "protocol_hint": "auto", "text": "http://user:pass@1.2.3.4:8080\nsocks5://5.6.7.8:1080\n9.9.9.9:3128" } ``` **Response** ```json { "imported": 120, "duplicated": 30, "invalid": 5, "invalid_items": [ {"raw":"bad_line","reason":"parse_failed"} ] } ``` 支持解析的常见行格式(建议实现): * `host:port` * `user:pass@host:port` * `http://user:pass@host:port` * `socks5://host:port` --- ### 5.2 导入(文件上传 txt/csv) `POST /v1/proxies/import/file`(multipart/form-data) **Form fields** * `group`: string * `tags`: 逗号分隔(或 tags 可重复) * `type`: `auto|txt|csv` * `file`: 上传文件 **Response** 同 5.1 CSV 建议列名: * `protocol,host,port,username,password,group,tags` * tags 用 `;` 分隔(例:`a;b;c`) --- ### 5.3 测试代理 `POST /v1/proxies/test` **Request** ```json { "group": "default", "filter": { "status": ["unknown", "alive"], "tags_any": ["batch-2026-01"], "limit": 200 }, "test_spec": { "url": "https://example.com/", "method": "GET", "timeout_ms": 5000, "expect_status": [200, 301, 302], "expect_contains": "" }, "concurrency": 50, "update_store": true, "write_log": false } ``` **Response** ```json { "summary": {"tested":200,"alive":120,"dead":80}, "results": [ {"proxy_id":"...","ok":true,"latency_ms":340,"error":""}, {"proxy_id":"...","ok":false,"latency_ms":5000,"error":"timeout"} ] } ``` --- ### 5.4 获取下一条可用代理(轮询) `GET /v1/proxies/next?group=default&site=https%3A%2F%2Fexample.com&policy=round_robin&tags_any=batch-2026-01` **Response** ```json { "proxy": { "id": "uuid", "protocol": "http", "host": "1.2.3.4", "port": 8080, "username": "user", "password": "pass" }, "lease_id": "lease_01H...", "ttl_ms": 60000 } ``` > 是否返回 password:建议做配置开关,例如 `RETURN_SECRET=true/false`。 --- ### 5.5 上报使用结果 `POST /v1/proxies/report` **Request** ```json { "lease_id": "lease_01H...", "proxy_id": "uuid", "success": false, "error": "403", "latency_ms": 1200 } ``` **Response** ```json {"ok": true} ``` --- ## 6. Go 预留接口(名称与契约) ### 6.1 Store(PgSQL 实现) ```go type ProxyQuery struct { Group string TagsAny []string StatusIn []ProxyStatus OnlyEnabled bool Limit int } type InvalidLine struct { Raw string Reason string } type ProxyStore interface { // 导入:批量 upsert + 去重统计 UpsertMany(ctx context.Context, proxies []Proxy) (imported, duplicated int, invalid []InvalidLine, err error) // 查询 List(ctx context.Context, q ProxyQuery) ([]Proxy, error) GetByID(ctx context.Context, id string) (*Proxy, error) // 写回健康度(测试/上报) UpdateHealth(ctx context.Context, proxyID string, patch HealthPatch) error // Round-robin 原子游标:返回 [0, modulo) 的索引 NextIndex(ctx context.Context, key string, modulo int) (int, error) // lease(推荐) CreateLease(ctx context.Context, lease Lease) error GetLease(ctx context.Context, leaseID string) (*Lease, error) DeleteExpiredLeases(ctx context.Context) (int64, error) // 可选:写测试日志 InsertTestLog(ctx context.Context, r TestResult, site string) error } ``` ### 6.2 Importer ```go type ImportInput struct { Group string Tags []string ProtocolHint string // "auto"|"http"|"socks5" } type ProxyImporter interface { ParseText(ctx context.Context, in ImportInput, text string) (proxies []Proxy, invalid []InvalidLine) ParseCSV(ctx context.Context, in ImportInput, r io.Reader) (proxies []Proxy, invalid []InvalidLine) } ``` ### 6.3 Tester ```go type TestSpec struct { URL string Method string Timeout time.Duration ExpectStatus []int ExpectContains string } type TestResult struct { ProxyID string OK bool LatencyMs int64 ErrorText string CheckedAt time.Time } type ProxyTester interface { TestOne(ctx context.Context, p Proxy, spec TestSpec) TestResult TestBatch(ctx context.Context, proxies []Proxy, spec TestSpec, concurrency int) []TestResult } ``` ### 6.4 Selector(分发策略) ```go type SelectRequest struct { Group string Site string Policy string // "round_robin"|"random"|"weighted" TagsAny []string } type Lease struct { LeaseID string Proxy Proxy ExpireAt time.Time Group string Site string } type ProxySelector interface { Next(ctx context.Context, req SelectRequest) (*Lease, error) Report(ctx context.Context, leaseID, proxyID string, success bool, latencyMs int64, errText string) error } ``` --- ## 7. PgSQL 关键实现伪代码(Store) > 假设使用 `pgx` / `database/sql` 都可。这里用伪代码表达 SQL 要点。 ### 7.1 批量 UpsertMany(导入去重统计) **核心 SQL(单条示意)**:用 `(xmax = 0)` 判断是否插入(PG 技巧) ```sql INSERT INTO proxies (id, protocol, host, port, username, password, "group", tags) VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (protocol, host, port, username) DO UPDATE SET password = EXCLUDED.password, "group" = EXCLUDED."group", tags = ( SELECT ARRAY( SELECT DISTINCT unnest(proxies.tags || EXCLUDED.tags) ) ) RETURNING (xmax = 0) AS inserted, id; ``` **Go 伪代码** ```go func (s *PgStore) UpsertMany(ctx context.Context, proxies []Proxy) (int, int, []InvalidLine, error) { // proxies 参数应已由 importer 过滤掉明显非法的 host/port imported := 0 duplicated := 0 tx := s.db.BeginTx(ctx) defer tx.Rollback() for _, p := range proxies { // 执行上面的 INSERT...RETURNING inserted := tx.QueryRow(ctx, sqlUpsert, p.ID, p.Protocol, p.Host, p.Port, p.Username, p.Password, p.Group, p.Tags).ScanBool() if inserted { imported++ } else { duplicated++ } } if err := tx.Commit(); err != nil { return 0,0,nil,err } return imported, duplicated, nil, nil } ``` --- ### 7.2 List(查询可用代理) ```go func (s *PgStore) List(ctx context.Context, q ProxyQuery) ([]Proxy, error) { // SQL 要点: // group = $1 // disabled=false(OnlyEnabled) // status IN (...) // tagsAny => tags && $tags // order by score desc, last_check_at desc rows := s.db.Query(ctx, sqlList, q.Group, q.StatusIn, q.TagsAny, q.Limit) return ScanProxies(rows), nil } ``` --- ### 7.3 UpdateHealth(写回健康度) 建议规则(你可调整): * success:`score += 1`,`success_count += 1`,`status = alive`,更新延迟与 last_check_at * fail:`score -= 3`,`fail_count += 1`,`status = dead`(或 fail_count 超阈值才 dead) * score 下限可做 clamp(避免无限负) ```go func (s *PgStore) UpdateHealth(ctx context.Context, proxyID string, patch HealthPatch) error { // 伪 SQL:按 patch 拼接更新字段(或固定一套更新逻辑) // UPDATE proxies SET // status = COALESCE($status, status), // score = score + $delta, // latency_ms = COALESCE($latency, latency_ms), // last_check_at = COALESCE($checked_at, last_check_at), // fail_count = fail_count + $failInc, // success_count = success_count + $successInc // WHERE id=$id; return nil } ``` --- ### 7.4 RR 游标 NextIndex(PgSQL 原子自增) ```sql INSERT INTO rr_cursors (k, v) VALUES ($1, 0) ON CONFLICT (k) DO UPDATE SET v = rr_cursors.v + 1, updated_at = now() RETURNING v; ``` ```go func (s *PgStore) NextIndex(ctx context.Context, key string, modulo int) (int, error) { if modulo <= 0 { return 0, ErrBadModulo } v := s.db.QueryRow(ctx, sqlCursor, key).ScanInt64() idx := int(v % int64(modulo)) if idx < 0 { idx = -idx } return idx, nil } ``` --- ### 7.5 Lease(推荐) ```go func (s *PgStore) CreateLease(ctx context.Context, lease Lease) error { // INSERT INTO proxy_leases(lease_id, proxy_id, expire_at, site, group) VALUES ... return nil } func (s *PgStore) GetLease(ctx context.Context, leaseID string) (*Lease, error) { // SELECT ... FROM proxy_leases WHERE lease_id=$1 AND expire_at > now() return nil, nil } func (s *PgStore) DeleteExpiredLeases(ctx context.Context) (int64, error) { // DELETE FROM proxy_leases WHERE expire_at <= now() return 0, nil } ``` --- ## 8. Importer 与 Tester 关键伪代码 ### 8.1 Importer:解析文本 ```go func (im *Importer) ParseText(ctx context.Context, in ImportInput, text string) ([]Proxy, []InvalidLine) { lines := SplitLines(text) var out []Proxy var bad []InvalidLine for _, raw := range lines { raw = strings.TrimSpace(raw) if raw == "" { continue } p, err := ParseProxyLine(raw, in.ProtocolHint) if err != nil { bad = append(bad, InvalidLine{Raw: raw, Reason: "parse_failed"}) continue } p.Group = Coalesce(p.Group, in.Group) p.Tags = MergeTags(p.Tags, in.Tags) p.ID = NewUUID() out = append(out, p) } // 可选:应用内再做一次去重(减少 DB 压力) out = Dedup(out) return out, bad } ``` ### 8.2 Tester:并发测试(worker pool) ```go func (t *HTTPTester) TestBatch(ctx context.Context, proxies []Proxy, spec TestSpec, concurrency int) []TestResult { jobs := make(chan Proxy) results := make(chan TestResult) for i := 0; i < concurrency; i++ { go func() { for p := range jobs { results <- t.TestOne(ctx, p, spec) } }() } go func() { defer close(jobs) for _, p := range proxies { jobs <- p } }() out := make([]TestResult, 0, len(proxies)) for i := 0; i < len(proxies); i++ { out = append(out, <-results) } return out } ``` (实现细节提醒) * HTTP 代理:`http.Transport{ Proxy: http.ProxyURL(proxyURL) }` * SOCKS5:可用 `golang.org/x/net/proxy` 创建 dialer,再塞给 Transport 的 DialContext(实现时注意超时与连接复用) --- ## 9. Selector(轮询分发)关键伪代码 ```go func (s *Selector) Next(ctx context.Context, req SelectRequest) (*Lease, error) { policy := Coalesce(req.Policy, "round_robin") proxies, err := s.store.List(ctx, ProxyQuery{ Group: req.Group, TagsAny: req.TagsAny, StatusIn: []ProxyStatus{StatusAlive}, OnlyEnabled: true, Limit: 5000, }) if err != nil { return nil, err } if len(proxies) == 0 { return nil, ErrNoProxy } var chosen Proxy switch policy { case "round_robin": key := "rr:" + req.Group + ":" + NormalizeSite(req.Site) idx, err := s.store.NextIndex(ctx, key, len(proxies)) if err != nil { return nil, err } chosen = proxies[idx] case "random": chosen = proxies[rand.Intn(len(proxies))] case "weighted": chosen = WeightedPickByScore(proxies) default: return nil, ErrBadPolicy } // lease lease := Lease{ LeaseID: NewLeaseID(), Proxy: chosen, Group: req.Group, Site: req.Site, ExpireAt: time.Now().Add(60 * time.Second), } _ = s.store.CreateLease(ctx, lease) // 推荐:失败也可降级不存(看你需要) return &lease, nil } func (s *Selector) Report(ctx context.Context, leaseID, proxyID string, success bool, latencyMs int64, errText string) error { // 可选:校验 lease 存在且未过期 // l, _ := s.store.GetLease(ctx, leaseID) nowMs := time.Now().UnixMilli() if success { st := StatusAlive return s.store.UpdateHealth(ctx, proxyID, HealthPatch{ Status: &st, ScoreDelta: +1, SuccessInc: 1, LatencyMs: &latencyMs, CheckedAtMs: &nowMs, }) } st := StatusDead return s.store.UpdateHealth(ctx, proxyID, HealthPatch{ Status: &st, ScoreDelta: -3, FailInc: 1, CheckedAtMs: &nowMs, }) } ``` --- ## 10. Handler(API 层)伪代码示例 ### 10.1 `/v1/proxies/next` ```go func HandleNext(sel ProxySelector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { req := SelectRequest{ Group: r.URL.Query().Get("group"), Site: r.URL.Query().Get("site"), Policy: r.URL.Query().Get("policy"), TagsAny: SplitCSV(r.URL.Query().Get("tags_any")), } if req.Group == "" { req.Group = "default" } lease, err := sel.Next(r.Context(), req) if err != nil { WriteErr(w, 404, err); return } WriteJSON(w, 200, map[string]any{ "proxy": map[string]any{ "id": lease.Proxy.ID, "protocol": lease.Proxy.Protocol, "host": lease.Proxy.Host, "port": lease.Proxy.Port, "username": lease.Proxy.Username, "password": lease.Proxy.Password, // 建议可配置是否返回 }, "lease_id": lease.LeaseID, "ttl_ms": int64(time.Until(lease.ExpireAt) / time.Millisecond), }) } } ``` --- ## 11. 安全与稳定性建议(强烈建议你落地) ### 11.1 SSRF 防护(测试目标 URL) `/proxies/test` 允许指定 URL,必须做: * 仅允许 `http/https` * 禁止解析到内网/私网 IP(10/8、172.16/12、192.168/16、127/8、169.254/16、::1、fc00::/7 等) * 可选:目标域名 allowlist(最稳妥) * 限制端口(80/443/自定义) ### 11.2 鉴权与审计 * 所有接口建议加 `API Key` 或 `JWT` * 对导入/测试/分发/上报做访问日志与速率限制(尤其 `/next`) ### 11.3 测试与分发的资源控制 * concurrency 上限(例如 <= 200) * 单次测试 limit 上限(例如 <= 2000) * HTTP 超时、TLS 握手超时、最大响应体大小(ReadLimited) * 为 `/next` 做简单缓存(例如缓存 alive 列表 5~30 秒)以减轻 DB 压力(后续也方便升级 Redis) --- ## 12. 关键流程(端到端) 1. 导入代理 `POST /v1/proxies/import/text` 或 `/import/file` → UpsertMany → proxies 表 2. 测试代理 `POST /v1/proxies/test` → List(unknown/alive) → Tester 并发测试 → UpdateHealth(可选 InsertTestLog) 3. 分发代理 `GET /v1/proxies/next` → List(alive) → NextIndex(rr_key) → chosen → CreateLease → 返回 4. 上报结果 `POST /v1/proxies/report` → UpdateHealth(成功加分,失败扣分/标 dead) ---