Equilink Best Practices
This guide outlines best practices for developing, deploying, and maintaining Equilink implementations. Following these guidelines ensures security, performance, and maintainability.
Key Management
Wallet Security
# DON'T: Store private keys in configuration files
private_key = config['wallet']['private_key'] # Unsafe
# DO: Use environment variables or secure key management
from equilink.security import KeyManager
key_manager = KeyManager()
private_key = await key_manager.get_key('wallet')
API Key Protection
Use secret management services
Rotate keys regularly
Implement key usage monitoring
Transaction Safety
Implement Transaction Guards
@transaction_guard
async def execute_trade(amount: float, token: str):
# Guards check:
# - Transaction size limits
# - Gas price reasonability
# - Contract allowances
# - Balance sufficiency
Multi-signature Requirements
# Require multiple signatures for large transactions
@require_multi_sig(threshold=1000)
async def large_transfer(amount: float):
# Implementation
Memory Management
Efficient Data Structures
# DON'T: Keep unlimited history
full_history = [] # Grows indefinitely
# DO: Use fixed-size data structures
from collections import deque
recent_history = deque(maxlen=1000)
Resource Cleanup
async with AsyncContextManager() as manager:
# Resources automatically cleaned up
await manager.process()
Concurrent Operations
Task Management
# DON'T: Create unlimited tasks
for item in items:
asyncio.create_task(process(item)) # Dangerous
# DO: Use task pools
async with TaskPool(max_size=10) as pool:
await pool.map(process, items)
Rate Limiting
@rate_limit(calls=100, period=60)
async def api_call():
# Implementation
Prompt Engineering
Structured Prompts
# DO: Use templated prompts
prompt_template = """
Context: {context}
Task: {task}
Constraints: {constraints}
"""
# DON'T: Use hard-coded strings
prompt = "Analyze the market..." # Inflexible
Response Validation
def validate_ai_response(response: dict):
schema = ResponseSchema({
'recommendation': str,
'confidence': float,
'reasoning': list
})
return schema.validate(response)
Model Management
Fallback Strategies
async def get_analysis(text: str):
try:
return await primary_model.analyze(text)
except ModelTimeout:
return await fallback_model.analyze(text)
Response Caching
@cache(ttl=300) # 5 minutes
async def get_market_sentiment(token: str):
return await model.analyze_sentiment(token)
Storage Optimization
Data Lifecycle
class DataManager:
async def archive_old_data(self):
"""Archive data older than 30 days."""
cutoff = datetime.now() - timedelta(days=30)
await self.move_to_archive(cutoff)
Efficient Querying
# DON'T: Fetch all records
all_trades = await Trade.all()
# DO: Use pagination and filters
trades = await Trade.filter(
timestamp__gte=yesterday
).limit(100).offset(0)
Data Validation
Input Validation
from pydantic import BaseModel
class TradeInput(BaseModel):
amount: float
token: str
price: float
@validator('amount')
def validate_amount(cls, v):
if v <= 0:
raise ValueError('Amount must be positive')
return v
Data Integrity Checks
async def verify_transaction(tx_hash: str):
# Check multiple sources
onchain_data = await blockchain.get_transaction(tx_hash)
indexer_data = await indexer.get_transaction(tx_hash)
return verify_consistency(onchain_data, indexer_data)
Logging Strategy
Structured Logging
# DO: Use structured logs
logger.info('Trade executed', extra={
'amount': amount,
'token': token,
'gas_used': gas,
'timestamp': timestamp
})
# DON'T: Use string concatenation
logger.info(f'Trade executed: {amount} {token}') # Hard to parse
Log Levels
# Error: Failed operations
logger.error('Transaction failed', exc_info=True)
# Warning: Unusual but handled situations
logger.warning('High gas price detected')
# Info: Normal operations
logger.info('Market analysis completed')
# Debug: Detailed information for debugging
logger.debug('Raw API response', extra={'response': response})
Metrics Collection
Performance Metrics
@measure_execution_time
async def process_block(block_number: int):
# Implementation
@track_memory_usage
async def analyze_market_data():
# Implementation
Business Metrics
async def track_trading_metrics():
await metrics.gauge('trading.volume.24h', volume)
await metrics.histogram('trading.price.distribution', prices)
Graceful Degradation
Circuit Breakers
@circuit_breaker(
failure_threshold=5,
recovery_timeout=300
)
async def external_api_call():
# Implementation
Fallback Mechanisms
async def get_price(token: str):
try:
return await primary_source.get_price(token)
except TimeoutError:
return await backup_source.get_price(token)
except AllSourcesFailed:
return await get_last_known_price(token)
Test Categories
Unit Tests
def test_price_calculation():
calculator = PriceCalculator()
result = calculator.calculate(100, 0.1)
assert result == 110
Integration Tests
@pytest.mark.integration
async def test_trade_execution():
agent = EquilinkAgent()
result = await agent.execute_trade(
amount=1,
token="TEST"
)
assert result.status == "completed"
Performance Tests
@pytest.mark.benchmark
async def test_analysis_performance():
start = time.time()
await analyze_market()
duration = time.time() - start
assert duration < 1.0 # Should complete within 1 second
Configuration Management
Environment-Based Config
development:
log_level: debug
api_timeout: 30
production:
log_level: info
api_timeout: 5
Feature Flags
if feature_flags.enabled('new_trading_engine'):
await new_engine.execute(trade)
else:
await legacy_engine.execute(trade)
Deployment Process
Staged Rollout
async def deploy_update():
# 1. Deploy to staging
await deploy_to_env('staging')
# 2. Run validation tests
await validate_deployment()
# 3. Gradual production rollout
for region in regions:
await rolling_update(region)
await monitor_health(region)
Remember:
Regular security audits
Comprehensive documentation
Continuous monitoring
Regular dependency updates
Backup and recovery testing
Last updated