Page cover image

Equilink Best Practices

This guide outlines best practices for developing, deploying, and maintaining Equilink implementations. Following these guidelines ensures security, performance, and maintainability.

Key Management

  1. Wallet Security

# DON'T: Store private keys in configuration files
private_key = config['wallet']['private_key']  # Unsafe

# DO: Use environment variables or secure key management
from equilink.security import KeyManager
key_manager = KeyManager()
private_key = await key_manager.get_key('wallet')
  1. API Key Protection

  • Use secret management services

  • Rotate keys regularly

  • Implement key usage monitoring

Transaction Safety

  1. Implement Transaction Guards

@transaction_guard
async def execute_trade(amount: float, token: str):
    # Guards check:
    # - Transaction size limits
    # - Gas price reasonability
    # - Contract allowances
    # - Balance sufficiency
  1. Multi-signature Requirements

# Require multiple signatures for large transactions
@require_multi_sig(threshold=1000)
async def large_transfer(amount: float):
    # Implementation

Memory Management

  1. Efficient Data Structures

# DON'T: Keep unlimited history
full_history = []  # Grows indefinitely

# DO: Use fixed-size data structures
from collections import deque
recent_history = deque(maxlen=1000)
  1. Resource Cleanup

async with AsyncContextManager() as manager:
    # Resources automatically cleaned up
    await manager.process()

Concurrent Operations

  1. Task Management

# DON'T: Create unlimited tasks
for item in items:
    asyncio.create_task(process(item))  # Dangerous

# DO: Use task pools
async with TaskPool(max_size=10) as pool:
    await pool.map(process, items)
  1. Rate Limiting

@rate_limit(calls=100, period=60)
async def api_call():
    # Implementation

Prompt Engineering

  1. Structured Prompts

# DO: Use templated prompts
prompt_template = """
Context: {context}
Task: {task}
Constraints: {constraints}
"""

# DON'T: Use hard-coded strings
prompt = "Analyze the market..."  # Inflexible
  1. Response Validation

def validate_ai_response(response: dict):
    schema = ResponseSchema({
        'recommendation': str,
        'confidence': float,
        'reasoning': list
    })
    return schema.validate(response)

Model Management

  1. Fallback Strategies

async def get_analysis(text: str):
    try:
        return await primary_model.analyze(text)
    except ModelTimeout:
        return await fallback_model.analyze(text)
  1. Response Caching

@cache(ttl=300)  # 5 minutes
async def get_market_sentiment(token: str):
    return await model.analyze_sentiment(token)

Storage Optimization

  1. Data Lifecycle

class DataManager:
    async def archive_old_data(self):
        """Archive data older than 30 days."""
        cutoff = datetime.now() - timedelta(days=30)
        await self.move_to_archive(cutoff)
  1. Efficient Querying

# DON'T: Fetch all records
all_trades = await Trade.all()

# DO: Use pagination and filters
trades = await Trade.filter(
    timestamp__gte=yesterday
).limit(100).offset(0)

Data Validation

  1. Input Validation

from pydantic import BaseModel

class TradeInput(BaseModel):
    amount: float
    token: str
    price: float

    @validator('amount')
    def validate_amount(cls, v):
        if v <= 0:
            raise ValueError('Amount must be positive')
        return v
  1. Data Integrity Checks

async def verify_transaction(tx_hash: str):
    # Check multiple sources
    onchain_data = await blockchain.get_transaction(tx_hash)
    indexer_data = await indexer.get_transaction(tx_hash)
    
    return verify_consistency(onchain_data, indexer_data)

Logging Strategy

  1. Structured Logging

# DO: Use structured logs
logger.info('Trade executed', extra={
    'amount': amount,
    'token': token,
    'gas_used': gas,
    'timestamp': timestamp
})

# DON'T: Use string concatenation
logger.info(f'Trade executed: {amount} {token}')  # Hard to parse
  1. Log Levels

# Error: Failed operations
logger.error('Transaction failed', exc_info=True)

# Warning: Unusual but handled situations
logger.warning('High gas price detected')

# Info: Normal operations
logger.info('Market analysis completed')

# Debug: Detailed information for debugging
logger.debug('Raw API response', extra={'response': response})

Metrics Collection

  1. Performance Metrics

@measure_execution_time
async def process_block(block_number: int):
    # Implementation

@track_memory_usage
async def analyze_market_data():
    # Implementation
  1. Business Metrics

async def track_trading_metrics():
    await metrics.gauge('trading.volume.24h', volume)
    await metrics.histogram('trading.price.distribution', prices)

Graceful Degradation

  1. Circuit Breakers

@circuit_breaker(
    failure_threshold=5,
    recovery_timeout=300
)
async def external_api_call():
    # Implementation
  1. Fallback Mechanisms

async def get_price(token: str):
    try:
        return await primary_source.get_price(token)
    except TimeoutError:
        return await backup_source.get_price(token)
    except AllSourcesFailed:
        return await get_last_known_price(token)

Test Categories

  1. Unit Tests

def test_price_calculation():
    calculator = PriceCalculator()
    result = calculator.calculate(100, 0.1)
    assert result == 110
  1. Integration Tests

@pytest.mark.integration
async def test_trade_execution():
    agent = EquilinkAgent()
    result = await agent.execute_trade(
        amount=1,
        token="TEST"
    )
    assert result.status == "completed"
  1. Performance Tests

@pytest.mark.benchmark
async def test_analysis_performance():
    start = time.time()
    await analyze_market()
    duration = time.time() - start
    assert duration < 1.0  # Should complete within 1 second

Configuration Management

  1. Environment-Based Config

development:
  log_level: debug
  api_timeout: 30
  
production:
  log_level: info
  api_timeout: 5
  1. Feature Flags

if feature_flags.enabled('new_trading_engine'):
    await new_engine.execute(trade)
else:
    await legacy_engine.execute(trade)

Deployment Process

  1. Staged Rollout

async def deploy_update():
    # 1. Deploy to staging
    await deploy_to_env('staging')
    
    # 2. Run validation tests
    await validate_deployment()
    
    # 3. Gradual production rollout
    for region in regions:
        await rolling_update(region)
        await monitor_health(region)

Remember:

  • Regular security audits

  • Comprehensive documentation

  • Continuous monitoring

  • Regular dependency updates

  • Backup and recovery testing

Last updated