20 KiB
下面是一份「代理池 + 测试 + 分发(轮询)服务」的完整开发文档(PgSQL 版本)。内容包括:整体架构、数据库表结构(SQL)、HTTP API 请求形式与契约、Go 语言预留接口(契约)与关键伪代码、关键流程与安全建议。
备注:该系统属于通用网络基础设施能力,建议仅在合法合规场景(自有业务、授权测试、合规爬取、出站路由)使用,并做好鉴权、审计与 SSRF 防护。
1. 目标与范围
1.1 目标能力
- 导入代理
- 支持:文本粘贴、上传 txt、上传 csv
- 自动解析多种格式,去重入库(PgSQL 唯一约束)
- 测试代理可用性
- 可指定目标网站(URL)与测试规则(超时、期望状态码、可选关键字)
- 支持并发 worker pool
- 测试结果可写回数据库(状态/延迟/计数/分数)
- 分发服务(轮询接口预留)
- 调用方通过 HTTP 获取“下一条可用代理”(RoundRobin / Random / Weighted 可扩展)
- 支持 lease(租约)机制(推荐),便于结果上报与统计
- 结果上报(可选但强烈推荐)
- 调用方用完代理后上报 success/fail,驱动 score 调整与熔断策略
2. 总体架构与模块划分
2.1 模块
- API Layer:HTTP 路由与鉴权、参数校验、返回 JSON
- Importer:解析 Text / CSV 为 Proxy 列表,输出 invalid 行原因
- Store (PgSQL):Upsert、查询、健康度更新、RR 游标、lease 管理
- Tester:对代理执行 HTTP 请求测试,返回 TestResult
- Selector:按策略从可用代理集合中选择一条(RoundRobin 为默认)
- (可选)Cleaner Job:定期清理过期 lease、(可选)写测试日志
2.2 推荐包结构
cmd/server/main.go
internal/api/handlers.go
internal/model/types.go
internal/importer/importer.go
internal/tester/http_tester.go
internal/selector/selector.go
internal/store/pg_store.go
internal/security/validate_url.go
3. 数据模型(Go)
// internal/model/types.go
type ProxyProtocol string
const (
ProtoHTTP ProxyProtocol = "http"
ProtoHTTPS ProxyProtocol = "https"
ProtoSOCKS5 ProxyProtocol = "socks5"
)
type ProxyStatus string
const (
StatusUnknown ProxyStatus = "unknown"
StatusAlive ProxyStatus = "alive"
StatusDead ProxyStatus = "dead"
)
type Proxy struct {
ID string // uuid
Protocol ProxyProtocol
Host string
Port int
Username string
Password string
Group string
Tags []string
Status ProxyStatus
Score int
LatencyMs int64
LastCheckAt int64 // unix ms 或 time.Time(看你序列化偏好)
FailCount int
SuccessCount int
Disabled bool
CreatedAt int64
UpdatedAt int64
}
type HealthPatch struct {
Status *ProxyStatus
ScoreDelta int
LatencyMs *int64
CheckedAtMs *int64
FailInc int
SuccessInc int
}
4. PostgreSQL 表结构(SQL)
设计要点:唯一约束去重、tags 用
text[] + GIN、RR 游标用独立表原子自增、lease 用表可追踪。
4.1 枚举类型与主表 proxies
CREATE TYPE proxy_protocol AS ENUM ('http', 'https', 'socks5');
CREATE TYPE proxy_status AS ENUM ('unknown', 'alive', 'dead');
CREATE TABLE proxies (
id uuid PRIMARY KEY,
protocol proxy_protocol NOT NULL,
host text NOT NULL,
port int NOT NULL CHECK (port > 0 AND port < 65536),
username text NOT NULL DEFAULT '',
password text NOT NULL DEFAULT '',
"group" text NOT NULL DEFAULT 'default',
tags text[] NOT NULL DEFAULT ARRAY[]::text[],
status proxy_status NOT NULL DEFAULT 'unknown',
score int NOT NULL DEFAULT 0,
latency_ms bigint NOT NULL DEFAULT 0,
last_check_at timestamptz,
fail_count int NOT NULL DEFAULT 0,
success_count int NOT NULL DEFAULT 0,
disabled boolean NOT NULL DEFAULT false,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
CONSTRAINT uq_proxy UNIQUE (protocol, host, port, username)
);
-- 常用索引
CREATE INDEX idx_proxies_group_status_disabled
ON proxies ("group", status, disabled);
-- tags 查询:tags && ARRAY['a','b']
CREATE INDEX idx_proxies_tags_gin
ON proxies USING gin (tags);
-- 可用代理热点查询(可选)
CREATE INDEX idx_proxies_alive_fast
ON proxies ("group", disabled, score DESC, last_check_at DESC)
WHERE status = 'alive';
-- updated_at 自动维护(可选,触发器)
CREATE OR REPLACE FUNCTION touch_updated_at()
RETURNS trigger AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_touch_updated_at
BEFORE UPDATE ON proxies
FOR EACH ROW
EXECUTE FUNCTION touch_updated_at();
4.2 RR 游标表 rr_cursors
CREATE TABLE rr_cursors (
k text PRIMARY KEY,
v bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL DEFAULT now()
);
4.3 租约表 proxy_leases(推荐)
CREATE TABLE proxy_leases (
lease_id text PRIMARY KEY,
proxy_id uuid NOT NULL REFERENCES proxies(id),
expire_at timestamptz NOT NULL,
site text NOT NULL DEFAULT '',
"group" text NOT NULL DEFAULT 'default',
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX idx_leases_expire ON proxy_leases(expire_at);
4.4(可选)测试记录表 proxy_test_logs
如果你需要审计/排障(哪个目标站导致失败),建议加:
CREATE TABLE proxy_test_logs (
id bigserial PRIMARY KEY,
proxy_id uuid NOT NULL REFERENCES proxies(id),
site text NOT NULL,
ok boolean NOT NULL,
latency_ms bigint NOT NULL,
error_text text NOT NULL DEFAULT '',
checked_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX idx_test_logs_proxy_time ON proxy_test_logs(proxy_id, checked_at DESC);
5. HTTP API 设计(请求形式 + 契约)
通用约定
- BasePath:
/v1 - Content-Type:
application/json(文件上传除外) - 鉴权:建议
Authorization: Bearer <token>或X-API-Key: <key> - 错误返回统一:
{"error":"bad_request","message":"..."}
5.1 导入(文本粘贴)
POST /v1/proxies/import/text
Request
{
"group": "default",
"tags": ["batch-2026-01"],
"protocol_hint": "auto",
"text": "http://user:pass@1.2.3.4:8080\nsocks5://5.6.7.8:1080\n9.9.9.9:3128"
}
Response
{
"imported": 120,
"duplicated": 30,
"invalid": 5,
"invalid_items": [
{"raw":"bad_line","reason":"parse_failed"}
]
}
支持解析的常见行格式(建议实现):
host:portuser:pass@host:porthttp://user:pass@host:portsocks5://host:port
5.2 导入(文件上传 txt/csv)
POST /v1/proxies/import/file(multipart/form-data)
Form fields
group: stringtags: 逗号分隔(或 tags 可重复)type:auto|txt|csvfile: 上传文件
Response 同 5.1
CSV 建议列名:
protocol,host,port,username,password,group,tags- tags 用
;分隔(例:a;b;c)
5.3 测试代理
POST /v1/proxies/test
Request
{
"group": "default",
"filter": {
"status": ["unknown", "alive"],
"tags_any": ["batch-2026-01"],
"limit": 200
},
"test_spec": {
"url": "https://example.com/",
"method": "GET",
"timeout_ms": 5000,
"expect_status": [200, 301, 302],
"expect_contains": ""
},
"concurrency": 50,
"update_store": true,
"write_log": false
}
Response
{
"summary": {"tested":200,"alive":120,"dead":80},
"results": [
{"proxy_id":"...","ok":true,"latency_ms":340,"error":""},
{"proxy_id":"...","ok":false,"latency_ms":5000,"error":"timeout"}
]
}
5.4 获取下一条可用代理(轮询)
GET /v1/proxies/next?group=default&site=https%3A%2F%2Fexample.com&policy=round_robin&tags_any=batch-2026-01
Response
{
"proxy": {
"id": "uuid",
"protocol": "http",
"host": "1.2.3.4",
"port": 8080,
"username": "user",
"password": "pass"
},
"lease_id": "lease_01H...",
"ttl_ms": 60000
}
是否返回 password:建议做配置开关,例如
RETURN_SECRET=true/false。
5.5 上报使用结果
POST /v1/proxies/report
Request
{
"lease_id": "lease_01H...",
"proxy_id": "uuid",
"success": false,
"error": "403",
"latency_ms": 1200
}
Response
{"ok": true}
6. Go 预留接口(名称与契约)
6.1 Store(PgSQL 实现)
type ProxyQuery struct {
Group string
TagsAny []string
StatusIn []ProxyStatus
OnlyEnabled bool
Limit int
}
type InvalidLine struct {
Raw string
Reason string
}
type ProxyStore interface {
// 导入:批量 upsert + 去重统计
UpsertMany(ctx context.Context, proxies []Proxy) (imported, duplicated int, invalid []InvalidLine, err error)
// 查询
List(ctx context.Context, q ProxyQuery) ([]Proxy, error)
GetByID(ctx context.Context, id string) (*Proxy, error)
// 写回健康度(测试/上报)
UpdateHealth(ctx context.Context, proxyID string, patch HealthPatch) error
// Round-robin 原子游标:返回 [0, modulo) 的索引
NextIndex(ctx context.Context, key string, modulo int) (int, error)
// lease(推荐)
CreateLease(ctx context.Context, lease Lease) error
GetLease(ctx context.Context, leaseID string) (*Lease, error)
DeleteExpiredLeases(ctx context.Context) (int64, error)
// 可选:写测试日志
InsertTestLog(ctx context.Context, r TestResult, site string) error
}
6.2 Importer
type ImportInput struct {
Group string
Tags []string
ProtocolHint string // "auto"|"http"|"socks5"
}
type ProxyImporter interface {
ParseText(ctx context.Context, in ImportInput, text string) (proxies []Proxy, invalid []InvalidLine)
ParseCSV(ctx context.Context, in ImportInput, r io.Reader) (proxies []Proxy, invalid []InvalidLine)
}
6.3 Tester
type TestSpec struct {
URL string
Method string
Timeout time.Duration
ExpectStatus []int
ExpectContains string
}
type TestResult struct {
ProxyID string
OK bool
LatencyMs int64
ErrorText string
CheckedAt time.Time
}
type ProxyTester interface {
TestOne(ctx context.Context, p Proxy, spec TestSpec) TestResult
TestBatch(ctx context.Context, proxies []Proxy, spec TestSpec, concurrency int) []TestResult
}
6.4 Selector(分发策略)
type SelectRequest struct {
Group string
Site string
Policy string // "round_robin"|"random"|"weighted"
TagsAny []string
}
type Lease struct {
LeaseID string
Proxy Proxy
ExpireAt time.Time
Group string
Site string
}
type ProxySelector interface {
Next(ctx context.Context, req SelectRequest) (*Lease, error)
Report(ctx context.Context, leaseID, proxyID string, success bool, latencyMs int64, errText string) error
}
7. PgSQL 关键实现伪代码(Store)
假设使用
pgx/database/sql都可。这里用伪代码表达 SQL 要点。
7.1 批量 UpsertMany(导入去重统计)
核心 SQL(单条示意):用 (xmax = 0) 判断是否插入(PG 技巧)
INSERT INTO proxies (id, protocol, host, port, username, password, "group", tags)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
ON CONFLICT (protocol, host, port, username)
DO UPDATE SET
password = EXCLUDED.password,
"group" = EXCLUDED."group",
tags = (
SELECT ARRAY(
SELECT DISTINCT unnest(proxies.tags || EXCLUDED.tags)
)
)
RETURNING (xmax = 0) AS inserted, id;
Go 伪代码
func (s *PgStore) UpsertMany(ctx context.Context, proxies []Proxy) (int, int, []InvalidLine, error) {
// proxies 参数应已由 importer 过滤掉明显非法的 host/port
imported := 0
duplicated := 0
tx := s.db.BeginTx(ctx)
defer tx.Rollback()
for _, p := range proxies {
// 执行上面的 INSERT...RETURNING
inserted := tx.QueryRow(ctx, sqlUpsert, p.ID, p.Protocol, p.Host, p.Port, p.Username, p.Password, p.Group, p.Tags).ScanBool()
if inserted { imported++ } else { duplicated++ }
}
if err := tx.Commit(); err != nil { return 0,0,nil,err }
return imported, duplicated, nil, nil
}
7.2 List(查询可用代理)
func (s *PgStore) List(ctx context.Context, q ProxyQuery) ([]Proxy, error) {
// SQL 要点:
// group = $1
// disabled=false(OnlyEnabled)
// status IN (...)
// tagsAny => tags && $tags
// order by score desc, last_check_at desc
rows := s.db.Query(ctx, sqlList, q.Group, q.StatusIn, q.TagsAny, q.Limit)
return ScanProxies(rows), nil
}
7.3 UpdateHealth(写回健康度)
建议规则(你可调整):
- success:
score += 1,success_count += 1,status = alive,更新延迟与 last_check_at - fail:
score -= 3,fail_count += 1,status = dead(或 fail_count 超阈值才 dead) - score 下限可做 clamp(避免无限负)
func (s *PgStore) UpdateHealth(ctx context.Context, proxyID string, patch HealthPatch) error {
// 伪 SQL:按 patch 拼接更新字段(或固定一套更新逻辑)
// UPDATE proxies SET
// status = COALESCE($status, status),
// score = score + $delta,
// latency_ms = COALESCE($latency, latency_ms),
// last_check_at = COALESCE($checked_at, last_check_at),
// fail_count = fail_count + $failInc,
// success_count = success_count + $successInc
// WHERE id=$id;
return nil
}
7.4 RR 游标 NextIndex(PgSQL 原子自增)
INSERT INTO rr_cursors (k, v)
VALUES ($1, 0)
ON CONFLICT (k)
DO UPDATE SET v = rr_cursors.v + 1, updated_at = now()
RETURNING v;
func (s *PgStore) NextIndex(ctx context.Context, key string, modulo int) (int, error) {
if modulo <= 0 { return 0, ErrBadModulo }
v := s.db.QueryRow(ctx, sqlCursor, key).ScanInt64()
idx := int(v % int64(modulo))
if idx < 0 { idx = -idx }
return idx, nil
}
7.5 Lease(推荐)
func (s *PgStore) CreateLease(ctx context.Context, lease Lease) error {
// INSERT INTO proxy_leases(lease_id, proxy_id, expire_at, site, group) VALUES ...
return nil
}
func (s *PgStore) GetLease(ctx context.Context, leaseID string) (*Lease, error) {
// SELECT ... FROM proxy_leases WHERE lease_id=$1 AND expire_at > now()
return nil, nil
}
func (s *PgStore) DeleteExpiredLeases(ctx context.Context) (int64, error) {
// DELETE FROM proxy_leases WHERE expire_at <= now()
return 0, nil
}
8. Importer 与 Tester 关键伪代码
8.1 Importer:解析文本
func (im *Importer) ParseText(ctx context.Context, in ImportInput, text string) ([]Proxy, []InvalidLine) {
lines := SplitLines(text)
var out []Proxy
var bad []InvalidLine
for _, raw := range lines {
raw = strings.TrimSpace(raw)
if raw == "" { continue }
p, err := ParseProxyLine(raw, in.ProtocolHint)
if err != nil {
bad = append(bad, InvalidLine{Raw: raw, Reason: "parse_failed"})
continue
}
p.Group = Coalesce(p.Group, in.Group)
p.Tags = MergeTags(p.Tags, in.Tags)
p.ID = NewUUID()
out = append(out, p)
}
// 可选:应用内再做一次去重(减少 DB 压力)
out = Dedup(out)
return out, bad
}
8.2 Tester:并发测试(worker pool)
func (t *HTTPTester) TestBatch(ctx context.Context, proxies []Proxy, spec TestSpec, concurrency int) []TestResult {
jobs := make(chan Proxy)
results := make(chan TestResult)
for i := 0; i < concurrency; i++ {
go func() {
for p := range jobs {
results <- t.TestOne(ctx, p, spec)
}
}()
}
go func() {
defer close(jobs)
for _, p := range proxies { jobs <- p }
}()
out := make([]TestResult, 0, len(proxies))
for i := 0; i < len(proxies); i++ {
out = append(out, <-results)
}
return out
}
(实现细节提醒)
- HTTP 代理:
http.Transport{ Proxy: http.ProxyURL(proxyURL) } - SOCKS5:可用
golang.org/x/net/proxy创建 dialer,再塞给 Transport 的 DialContext(实现时注意超时与连接复用)
9. Selector(轮询分发)关键伪代码
func (s *Selector) Next(ctx context.Context, req SelectRequest) (*Lease, error) {
policy := Coalesce(req.Policy, "round_robin")
proxies, err := s.store.List(ctx, ProxyQuery{
Group: req.Group,
TagsAny: req.TagsAny,
StatusIn: []ProxyStatus{StatusAlive},
OnlyEnabled: true,
Limit: 5000,
})
if err != nil { return nil, err }
if len(proxies) == 0 { return nil, ErrNoProxy }
var chosen Proxy
switch policy {
case "round_robin":
key := "rr:" + req.Group + ":" + NormalizeSite(req.Site)
idx, err := s.store.NextIndex(ctx, key, len(proxies))
if err != nil { return nil, err }
chosen = proxies[idx]
case "random":
chosen = proxies[rand.Intn(len(proxies))]
case "weighted":
chosen = WeightedPickByScore(proxies)
default:
return nil, ErrBadPolicy
}
// lease
lease := Lease{
LeaseID: NewLeaseID(),
Proxy: chosen,
Group: req.Group,
Site: req.Site,
ExpireAt: time.Now().Add(60 * time.Second),
}
_ = s.store.CreateLease(ctx, lease) // 推荐:失败也可降级不存(看你需要)
return &lease, nil
}
func (s *Selector) Report(ctx context.Context, leaseID, proxyID string, success bool, latencyMs int64, errText string) error {
// 可选:校验 lease 存在且未过期
// l, _ := s.store.GetLease(ctx, leaseID)
nowMs := time.Now().UnixMilli()
if success {
st := StatusAlive
return s.store.UpdateHealth(ctx, proxyID, HealthPatch{
Status: &st, ScoreDelta: +1, SuccessInc: 1,
LatencyMs: &latencyMs, CheckedAtMs: &nowMs,
})
}
st := StatusDead
return s.store.UpdateHealth(ctx, proxyID, HealthPatch{
Status: &st, ScoreDelta: -3, FailInc: 1,
CheckedAtMs: &nowMs,
})
}
10. Handler(API 层)伪代码示例
10.1 /v1/proxies/next
func HandleNext(sel ProxySelector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
req := SelectRequest{
Group: r.URL.Query().Get("group"),
Site: r.URL.Query().Get("site"),
Policy: r.URL.Query().Get("policy"),
TagsAny: SplitCSV(r.URL.Query().Get("tags_any")),
}
if req.Group == "" { req.Group = "default" }
lease, err := sel.Next(r.Context(), req)
if err != nil { WriteErr(w, 404, err); return }
WriteJSON(w, 200, map[string]any{
"proxy": map[string]any{
"id": lease.Proxy.ID,
"protocol": lease.Proxy.Protocol,
"host": lease.Proxy.Host,
"port": lease.Proxy.Port,
"username": lease.Proxy.Username,
"password": lease.Proxy.Password, // 建议可配置是否返回
},
"lease_id": lease.LeaseID,
"ttl_ms": int64(time.Until(lease.ExpireAt) / time.Millisecond),
})
}
}
11. 安全与稳定性建议(强烈建议你落地)
11.1 SSRF 防护(测试目标 URL)
/proxies/test 允许指定 URL,必须做:
- 仅允许
http/https - 禁止解析到内网/私网 IP(10/8、172.16/12、192.168/16、127/8、169.254/16、::1、fc00::/7 等)
- 可选:目标域名 allowlist(最稳妥)
- 限制端口(80/443/自定义)
11.2 鉴权与审计
- 所有接口建议加
API Key或JWT - 对导入/测试/分发/上报做访问日志与速率限制(尤其
/next)
11.3 测试与分发的资源控制
- concurrency 上限(例如 <= 200)
- 单次测试 limit 上限(例如 <= 2000)
- HTTP 超时、TLS 握手超时、最大响应体大小(ReadLimited)
- 为
/next做简单缓存(例如缓存 alive 列表 5~30 秒)以减轻 DB 压力(后续也方便升级 Redis)
12. 关键流程(端到端)
-
导入代理
POST /v1/proxies/import/text或/import/file→ UpsertMany → proxies 表 -
测试代理
POST /v1/proxies/test→ List(unknown/alive) → Tester 并发测试 → UpdateHealth(可选 InsertTestLog) -
分发代理
GET /v1/proxies/next→ List(alive) → NextIndex(rr_key) → chosen → CreateLease → 返回 -
上报结果
POST /v1/proxies/report→ UpdateHealth(成功加分,失败扣分/标 dead)