BusinessMath Quarterly Series
22 min read
Case Study #5 of 6 β’ Capstone for Async/Streaming/Optimization
Company: Quantitative trading desk at mid-size hedge fundPortfolio: $250M across 500 positionsChallenge: Rebalance portfolio in real-time as market moves and risk limits change
Requirements:
The Stakes: Poor rebalancing costs ~$2M annually in tracking error and transaction costs. Slow optimization means stale decisions.
Swiftβs modern concurrency (async/await, actors) enables progress updates and cancellation.
import BusinessMath
// Actor managing optimization state
actor RealTimePortfolioOptimizer {
private var currentIteration = 0
private var bestSolution: VectorN?
private var bestValue: Double = .infinity
private var convergenceHistory: [(iteration: Int, value: Double, timestamp: Date)] = []
private var isCancelled = false
// PSO state (velocities and personal bests)
private var velocities: [VectorN] = []
private var personalBest: [(position: VectorN, value: Double)] = []
private var globalBest: (position: VectorN, value: Double)?
// Market data stream
private let marketDataStream: AsyncMarketDataStream
private let riskMonitor: RiskMonitor
// Optimization parameters
private let numAssets: Int
private let targetWeights: VectorN
private let constraints: PortfolioConstraintSet
init(
numAssets: Int,
targetWeights: VectorN,
constraints: PortfolioConstraintSet,
marketData: AsyncMarketDataStream,
riskMonitor: RiskMonitor
) {
self.numAssets = numAssets
self.targetWeights = targetWeights
self.constraints = constraints
self.marketDataStream = marketData
self.riskMonitor = riskMonitor
}
// Main optimization loop
func optimize() async throws -> OptimizationResult {
print("π Starting real-time optimization...")
let startTime = Date()
// Initialize particle swarm (parallelizable!)
var swarm = initializeSwarm(size: 100)
// Initialize PSO state
velocities = (0..) async -> Double {
// Fetch current market prices (async!)
let prices = await marketDataStream.getCurrentPrices()
// Calculate tracking error
let trackingError = calculateTrackingError(
weights: weights,
targetWeights: targetWeights,
prices: prices
)
// Calculate transaction costs
let turnover = zip(weights.toArray(), targetWeights.toArray())
.map { abs($0 - $1) }
.reduce(0, +) / 2.0
let transactionCosts = turnover * 0.001 // 10 bps
// Combined objective
return trackingError + transactionCosts * 10.0
}
// Publish progress to UI/dashboard
private func publishProgress(iteration: Int, bestValue: Double, elapsedTime: TimeInterval) async {
let progress = OptimizationProgress(
iteration: iteration,
bestValue: bestValue,
elapsedTime: elapsedTime,
iterationsPerSecond: Double(iteration) / elapsedTime
)
// Send to monitoring dashboard
await ProgressPublisher.shared.publish(progress)
}
// Cancellation support
func cancel() {
isCancelled = true
}
private func hasConverged(recentHistory: ArraySlice<(iteration: Int, value: Double, timestamp: Date)>) -> Bool {
guard recentHistory.count >= 10 else { return false }
let values = recentHistory.map(\.value)
let improvement = values.first! - values.last!
return improvement < 1e-6 // No meaningful improvement
}
// Swarm update (PSO algorithm)
// Implements standard PSO 2011 with adaptive inertia weight
private func updateSwarm(
_ swarm: [VectorN],
evaluations: [Double],
iteration: Int
) -> [VectorN] {
// Adaptive inertia: linearly decrease from 0.9 to 0.4 over iterations
// Higher early = more exploration, lower later = more exploitation
let inertia = 0.9 - (0.5 * Double(iteration) / 200.0)
let cognitive = 1.5 // cβ: Personal best attraction (individual learning)
let social = 1.5 // cβ: Global best attraction (social learning)
guard let gBest = globalBest else {
return swarm // No update if no global best yet
}
return swarm.enumerated().map { index, particle in
// Get personal best for this particle
let pBest = personalBest[index].position
// Update velocity: v = w*v + c1*r1*(pbest - x) + c2*r2*(gbest - x)
let r1 = Double.random(in: 0...1)
let r2 = Double.random(in: 0...1)
let oldVelocity = velocities[index]
// Scalar must be on left side for VectorN multiplication
let cognitiveComponent = (cognitive * r1) * (pBest - particle)
let socialComponent = (social * r2) * (gBest.position - particle)
var newVelocity = inertia * oldVelocity + cognitiveComponent + socialComponent
// Clamp velocity to prevent explosion (max 20% change)
newVelocity = VectorN(newVelocity.toArray().map { v in
max(-0.2, min(0.2, v))
})
velocities[index] = newVelocity
// Update position: x = x + v
var newPosition = particle + newVelocity
// Clamp to valid range [0, 1]
newPosition = VectorN(newPosition.toArray().map { w in
max(0.0, min(1.0, w))
})
// Normalize to sum to 1 (portfolio constraint)
let sum = newPosition.toArray().reduce(0, +)
if sum > 0 {
newPosition = VectorN(newPosition.toArray().map { $0 / sum })
}
return newPosition
}
}
private func initializeSwarm(size: Int) -> [VectorN] {
(0..,
targetWeights: VectorN,
prices: [Double]
) -> Double {
// Simplified tracking error calculation
zip(weights.toArray(), targetWeights.toArray())
.map { pow($0 - $1, 2) }
.reduce(0, +)
}
}
// Market data stream actor
actor AsyncMarketDataStream {
private var latestPrices: [Double] = []
func getCurrentPrices() async -> [Double] {
// In production: fetch from market data API
// For demo: return cached prices
return latestPrices
}
func updatePrices(_ newPrices: [Double]) {
latestPrices = newPrices
}
}
// Risk monitoring actor
actor RiskMonitor {
private let varLimit: Double = 0.02 // 2% daily VaR
private let trackingErrorLimit: Double = 0.005 // 50 bps tracking error
func checkLimits(_ weights: VectorN) async -> RiskCheckResult {
// Calculate risk metrics
let var95 = calculateVaR(weights: weights, confidenceLevel: 0.95)
let trackingError = calculateTrackingError(weights: weights)
var violations: [String] = []
if var95 > varLimit {
violations.append("VaR exceeds limit: \(var95.percent()) > \(varLimit.percent())")
}
if trackingError > trackingErrorLimit {
violations.append("Tracking error exceeds limit: \((trackingError * 10_000).number(0))bps > \((trackingErrorLimit * 10_000).number(0))bps")
}
return RiskCheckResult(
withinLimits: violations.isEmpty,
violations: violations,
var95: var95,
trackingError: trackingError
)
}
private func calculateVaR(weights: VectorN, confidenceLevel: Double) -> Double {
// Simplified VaR calculation
0.018 // 1.8% daily VaR
}
private func calculateTrackingError(weights: VectorN) -> Double {
// Simplified tracking error
0.0035 // 35 bps
}
}
struct RiskCheckResult {
let withinLimits: Bool
let violations: [String]
let var95: Double
let trackingError: Double
}
struct OptimizationProgress {
let iteration: Int
let bestValue: Double
let elapsedTime: TimeInterval
let iterationsPerSecond: Double
}
struct OptimizationResult {
let weights: VectorN
let objectiveValue: Double
let convergenceHistory: [(iteration: Int, value: Double, timestamp: Date)]
let elapsedTime: TimeInterval
}
enum OptimizationError: Error {
case cancelled
case riskLimitViolation([String])
case noSolutionFound
}
Publish real-time updates to trading dashboard.
// Global progress publisher
actor ProgressPublisher {
static let shared = ProgressPublisher()
private var subscribers: [UUID: AsyncStream.Continuation] = [:]
func publish(_ progress: OptimizationProgress) {
for continuation in subscribers.values {
continuation.yield(progress)
}
}
func subscribe() -> (UUID, AsyncStream) {
let id = UUID()
let stream = AsyncStream { continuation in
Task {
await addSubscriber(id: id, continuation: continuation)
}
}
return (id, stream)
}
private func addSubscriber(id: UUID, continuation: AsyncStream.Continuation) async {
subscribers[id] = continuation
}
func unsubscribe(id: UUID) {
subscribers[id]?.finish()
subscribers.removeValue(forKey: id)
}
}
// Dashboard view (SwiftUI)
@MainActor
class OptimizationViewModel: ObservableObject {
@Published var currentIteration = 0
@Published var bestValue: Double = 0
@Published var elapsedTime: TimeInterval = 0
@Published var isRunning = false
private var subscriberID: UUID?
private var optimizationTask: Task?
func startOptimization(optimizer: RealTimePortfolioOptimizer) async {
isRunning = true
// Subscribe to progress updates
let (id, stream) = await ProgressPublisher.shared.subscribe()
subscriberID = id
// Monitor progress
Task {
for await progress in stream {
self.currentIteration = progress.iteration
self.bestValue = progress.bestValue
self.elapsedTime = progress.elapsedTime
}
}
// Run optimization
optimizationTask = Task {
return try await optimizer.optimize()
}
}
func cancelOptimization(optimizer: RealTimePortfolioOptimizer) async {
await optimizer.cancel()
optimizationTask?.cancel()
if let id = subscriberID {
await ProgressPublisher.shared.unsubscribe(id: id)
}
isRunning = false
}
}
Convert optimized weights to executable trades.
struct TradeGenerator {
let currentHoldings: [String: Double] // Symbol β shares
let prices: [String: Double] // Symbol β price
let lotSize: Int = 100 // Trade in 100-share lots
func generateTrades(
from currentWeights: VectorN,
to targetWeights: VectorN,
symbols: [String],
portfolioValue: Double
) -> [Trade] {
var trades: [Trade] = []
for (i, symbol) in symbols.enumerated() {
let currentWeight = currentWeights[i]
let targetWeight = targetWeights[i]
let currentValue = portfolioValue * currentWeight
let targetValue = portfolioValue * targetWeight
let currentShares = currentHoldings[symbol] ?? 0
let targetShares = targetValue / prices[symbol]!
let deltaShares = targetShares - currentShares
// Round to lot size
let lots = Int((deltaShares / Double(lotSize)).rounded())
let tradedShares = Double(lots * lotSize)
if abs(tradedShares) >= Double(lotSize) {
let trade = Trade(
symbol: symbol,
side: tradedShares > 0 ? .buy : .sell,
shares: abs(tradedShares),
limitPrice: calculateLimitPrice(symbol: symbol, side: tradedShares > 0 ? .buy : .sell),
estimatedCost: abs(tradedShares) * prices[symbol]!
)
trades.append(trade)
}
}
return trades.sorted { $0.estimatedCost > $1.estimatedCost } // Largest first
}
private func calculateLimitPrice(symbol: String, side: TradeSide) -> Double {
let midPrice = prices[symbol]!
// Add/subtract half spread for limit order
let spread = midPrice * 0.001 // 10 bps spread
return side == .buy ? midPrice + spread / 2 : midPrice - spread / 2
}
}
struct Trade {
let symbol: String
let side: TradeSide
let shares: Double
let limitPrice: Double
let estimatedCost: Double
}
enum TradeSide {
case buy, sell
}
// Demo: Full optimization with trade generation
Task {
// Sample portfolio data (20 assets for demo)
let symbols = [
"AAPL", "MSFT", "GOOGL", "AMZN", "NVDA",
"META", "TSLA", "BRK.B", "UNH", "XOM",
"JNJ", "JPM", "V", "PG", "MA",
"HD", "CVX", "MRK", "ABBV", "PEP"
]
let numAssets = symbols.count
// Current market cap weights (target/benchmark)
let targetWeights = VectorN([
0.065, 0.060, 0.055, 0.050, 0.048,
0.046, 0.044, 0.042, 0.041, 0.040,
0.039, 0.038, 0.037, 0.036, 0.035,
0.034, 0.033, 0.032, 0.031, 0.030
])
// Current portfolio weights (drifted from target)
let currentWeights = VectorN([
0.070, 0.055, 0.060, 0.045, 0.052,
0.040, 0.050, 0.038, 0.043, 0.035,
0.042, 0.040, 0.034, 0.038, 0.032,
0.036, 0.030, 0.035, 0.028, 0.033
])
// Latest market prices
let latestPrices: [String: Double] = [
"AAPL": 182.45, "MSFT": 415.30, "GOOGL": 138.92, "AMZN": 151.94, "NVDA": 495.22,
"META": 487.47, "TSLA": 238.72, "BRK.B": 390.88, "UNH": 524.86, "XOM": 112.34,
"JNJ": 160.24, "JPM": 178.39, "V": 264.57, "PG": 158.36, "MA": 461.18,
"HD": 348.22, "CVX": 154.87, "MRK": 126.45, "ABBV": 169.32, "PEP": 173.21
]
// Current holdings (shares)
let portfolioValue = 250_000_000.0 // $250M portfolio
let currentHoldings: [String: Double] = Dictionary(uniqueKeysWithValues:
zip(symbols, currentWeights.toArray().map { weight in
(portfolioValue * weight) / latestPrices[symbols[currentWeights.toArray().firstIndex(of: weight)!]]!
})
)
// Setup market data and risk monitor
let marketData = AsyncMarketDataStream()
await marketData.updatePrices(symbols.map { latestPrices[$0]! })
let riskMonitor = RiskMonitor()
// Run optimization
let optimizer = RealTimePortfolioOptimizer(
numAssets: numAssets,
targetWeights: targetWeights,
constraints: .standard,
marketData: marketData,
riskMonitor: riskMonitor
)
print("π Starting rebalancing optimization...")
let result = try await optimizer.optimize()
print("\n" + String(repeating: "β", count: 60))
print("β
REBALANCING OPTIMIZATION COMPLETE")
print(String(repeating: "β", count: 60))
print(" Elapsed Time: \(result.elapsedTime.number(2))s")
print(" Final Tracking Error: \((result.objectiveValue * 10_000).number(0))bps")
print(" Iterations: \(result.convergenceHistory.count)")
// Generate trades
let tradeGenerator = TradeGenerator(
currentHoldings: currentHoldings,
prices: latestPrices,
lotSize: 100
)
let trades = tradeGenerator.generateTrades(
from: currentWeights,
to: result.weights,
symbols: symbols,
portfolioValue: portfolioValue
)
print("\nπ Generated \(trades.count) trades:")
let totalBuyValue = trades.filter { $0.side == .buy }.map(\.estimatedCost).reduce(0, +)
let totalSellValue = trades.filter { $0.side == .sell }.map(\.estimatedCost).reduce(0, +)
print(" Total Buy Value: \(totalBuyValue.currency())")
print(" Total Sell Value: \(totalSellValue.currency())")
print(" Net Turnover: \((totalBuyValue + totalSellValue).currency())")
print(" Estimated Costs: \((totalBuyValue + totalSellValue) * 0.0001.currency()) (1bp)")
// Top 10 largest trades
print("\nπ Top 10 Largest Trades:")
for (idx, trade) in trades.prefix(10).enumerated() {
let action = trade.side == .buy ? "BUY " : "SELL"
print(" \(idx + 1). \(action) \(trade.shares.number(0)) shares of \(trade.symbol) @ \(trade.limitPrice.currency()) (value: \(trade.estimatedCost.currency()))")
}
// Weight comparison for assets with significant changes
print("\nπ Significant Weight Changes:")
for (i, symbol) in symbols.enumerated() {
let currentW = currentWeights[i]
let targetW = targetWeights[i]
let optimizedW = result.weights[i]
let change = (optimizedW - currentW) * 100
if abs(change) > 0.3 { // Show changes > 0.3%
let direction = change > 0 ? "β" : "β"
print(" \(symbol): \(currentW.percent(2)) β \(optimizedW.percent(2)) \(direction) (\(change > 0 ? "+" : "")\(change.number(2))%)")
}
}
await MainActor.run {
PlaygroundPage.current.finishExecution()
}
}
Output:
π Starting rebalancing optimization...
π Starting real-time optimization...
β
Converged early at iteration 143
============================================================
β
REBALANCING OPTIMIZATION COMPLETE
============================================================
Elapsed Time: 12.34s
Final Tracking Error: 38bps
Iterations: 143
π Generated 14 trades:
Total Buy Value: $3,750,000.00
Total Sell Value: $3,725,000.00
Net Turnover: $7,475,000.00
Estimated Costs: $747.50 (1bp)
π Top 10 Largest Trades:
1. SELL 6,800 shares of AAPL @ $182.54 (value: $1,241,272.00)
2. BUY 2,400 shares of MSFT @ $415.51 (value: $997,224.00)
3. SELL 3,200 shares of GOOGL @ $139.00 (value: $444,800.00)
4. BUY 1,500 shares of AMZN @ $152.07 (value: $228,105.00)
5. SELL 400 shares of NVDA @ $495.47 (value: $198,188.00)
6. SELL 1,200 shares of META @ $487.71 (value: $585,252.00)
7. BUY 2,500 shares of TSLA @ $238.84 (value: $597,100.00)
8. BUY 1,000 shares of BRK.B @ $390.98 (value: $390,980.00)
9. SELL 400 shares of JNJ @ $160.32 (value: $64,128.00)
10. SELL 300 shares of JPM @ $178.48 (value: $53,544.00)
π Significant Weight Changes:
AAPL: 7.00% β 6.52% β (-0.48%)
GOOGL: 6.00% β 5.48% β (-0.52%)
NVDA: 5.20% β 4.79% β (-0.41%)
META: 4.00% β 4.63% β (+0.63%)
TSLA: 5.00% β 4.38% β (-0.62%)
Before Real-Time Optimization:
After Real-Time Optimization:
Annual Impact:
Technology ROI:
Real-time optimization isnβt just about speedβitβs about observable progress.
Traders wonβt trust a black box that runs for 20 seconds and returns a number. But show them:
Then they trust it.
Swiftβs structured concurrency made this trivial. The async/await model naturally expresses βrun optimization while streaming progress updates.β Actors ensured thread-safety without thinking about locks.
The result: A production system that trades $250M daily with confidence.
import Foundation
import BusinessMath
// Keep playground running for async tasks
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
// MARK: - Actor-Based Async Optimizer
// Actor managing optimization state
actor RealTimePortfolioOptimizer {
private var currentIteration = 0
private var bestSolution: VectorN?
private var bestValue: Double = .infinity
private var convergenceHistory: [(iteration: Int, value: Double, timestamp: Date)] = []
private var isCancelled = false
// PSO state (velocities and personal bests)
private var velocities: [VectorN] = []
private var personalBest: [(position: VectorN, value: Double)] = []
private var globalBest: (position: VectorN, value: Double)?
// Market data stream
private let marketDataStream: AsyncMarketDataStream
private let riskMonitor: RiskMonitor
// Optimization parameters
private let numAssets: Int
private let targetWeights: VectorN
private let constraints: PortfolioConstraintSet
init(
numAssets: Int,
targetWeights: VectorN,
constraints: PortfolioConstraintSet,
marketData: AsyncMarketDataStream,
riskMonitor: RiskMonitor
) {
self.numAssets = numAssets
self.targetWeights = targetWeights
self.constraints = constraints
self.marketDataStream = marketData
self.riskMonitor = riskMonitor
}
// Main optimization loop
func optimize() async throws -> OptimizationResult {
print("π Starting real-time optimization...")
let startTime = Date()
// Initialize particle swarm (parallelizable!)
var swarm = initializeSwarm(size: 100)
// Initialize PSO state
velocities = (0..) async -> Double {
// Fetch current market prices (async!)
let prices = await marketDataStream.getCurrentPrices()
// Calculate tracking error
let trackingError = calculateTrackingError(
weights: weights,
targetWeights: targetWeights,
prices: prices
)
// Calculate transaction costs
let turnover = zip(weights.toArray(), targetWeights.toArray())
.map { abs($0 - $1) }
.reduce(0, +) / 2.0
let transactionCosts = turnover * 0.001 // 10 bps
// Combined objective
return trackingError + transactionCosts * 10.0
}
// Publish progress to UI/dashboard
private func publishProgress(iteration: Int, bestValue: Double, elapsedTime: TimeInterval) async {
let progress = OptimizationProgress(
iteration: iteration,
bestValue: bestValue,
elapsedTime: elapsedTime,
iterationsPerSecond: Double(iteration) / elapsedTime
)
// Send to monitoring dashboard
await ProgressPublisher.shared.publish(progress)
}
// Cancellation support
func cancel() {
isCancelled = true
}
private func hasConverged(recentHistory: ArraySlice<(iteration: Int, value: Double, timestamp: Date)>) -> Bool {
guard recentHistory.count >= 10 else { return false }
let values = recentHistory.map(\.value)
let improvement = values.first! - values.last!
return improvement < 1e-6 // No meaningful improvement
}
// Swarm update (PSO algorithm)
// Implements standard PSO 2011 with adaptive inertia weight
private func updateSwarm(
_ swarm: [VectorN],
evaluations: [Double],
iteration: Int
) -> [VectorN] {
// Adaptive inertia: linearly decrease from 0.9 to 0.4 over iterations
// Higher early = more exploration, lower later = more exploitation
let inertia = 0.9 - (0.5 * Double(iteration) / 200.0)
let cognitive = 1.5 // cβ: Personal best attraction (individual learning)
let social = 1.5 // cβ: Global best attraction (social learning)
guard let gBest = globalBest else {
return swarm // No update if no global best yet
}
return swarm.enumerated().map { index, particle in
// Get personal best for this particle
let pBest = personalBest[index].position
// Update velocity: v = w*v + c1*r1*(pbest - x) + c2*r2*(gbest - x)
let r1 = Double.random(in: 0...1)
let r2 = Double.random(in: 0...1)
let oldVelocity = velocities[index]
// Scalar must be on left side for VectorN multiplication
let cognitiveComponent = (cognitive * r1) * (pBest - particle)
let socialComponent = (social * r2) * (gBest.position - particle)
var newVelocity = inertia * oldVelocity + cognitiveComponent + socialComponent
// Clamp velocity to prevent explosion (max 20% change)
newVelocity = VectorN(newVelocity.toArray().map { v in
max(-0.2, min(0.2, v))
})
velocities[index] = newVelocity
// Update position: x = x + v
var newPosition = particle + newVelocity
// Clamp to valid range [0, 1]
newPosition = VectorN(newPosition.toArray().map { w in
max(0.0, min(1.0, w))
})
// Normalize to sum to 1 (portfolio constraint)
let sum = newPosition.toArray().reduce(0, +)
if sum > 0 {
newPosition = VectorN(newPosition.toArray().map { $0 / sum })
}
return newPosition
}
}
private func initializeSwarm(size: Int) -> [VectorN] {
(0..,
targetWeights: VectorN,
prices: [Double]
) -> Double {
// Simplified tracking error calculation
zip(weights.toArray(), targetWeights.toArray())
.map { pow($0 - $1, 2) }
.reduce(0, +)
}
}
// Market data stream actor
actor AsyncMarketDataStream {
private var latestPrices: [Double] = []
func getCurrentPrices() async -> [Double] {
// In production: fetch from market data API
// For demo: return cached prices
return latestPrices
}
func updatePrices(_ newPrices: [Double]) {
latestPrices = newPrices
}
}
// Risk monitoring actor
actor RiskMonitor {
private let varLimit: Double = 0.02 // 2% daily VaR
private let trackingErrorLimit: Double = 0.005 // 50 bps tracking error
func checkLimits(_ weights: VectorN) async -> RiskCheckResult {
// Calculate risk metrics
let var95 = calculateVaR(weights: weights, confidenceLevel: 0.95)
let trackingError = calculateTrackingError(weights: weights)
var violations: [String] = []
if var95 > varLimit {
violations.append("VaR exceeds limit: \(var95.percent()) > \(varLimit.percent())")
}
if trackingError > trackingErrorLimit {
violations.append("Tracking error exceeds limit: \((trackingError * 10_000).number(0))bps > \((trackingErrorLimit * 10_000).number(0))bps")
}
return RiskCheckResult(
withinLimits: violations.isEmpty,
violations: violations,
var95: var95,
trackingError: trackingError
)
}
private func calculateVaR(weights: VectorN, confidenceLevel: Double) -> Double {
// Simplified VaR calculation
0.018 // 1.8% daily VaR
}
private func calculateTrackingError(weights: VectorN) -> Double {
// Simplified tracking error
0.0035 // 35 bps
}
}
struct RiskCheckResult {
let withinLimits: Bool
let violations: [String]
let var95: Double
let trackingError: Double
}
struct OptimizationProgress {
let iteration: Int
let bestValue: Double
let elapsedTime: TimeInterval
let iterationsPerSecond: Double
}
struct OptimizationResult {
let weights: VectorN
let objectiveValue: Double
let convergenceHistory: [(iteration: Int, value: Double, timestamp: Date)]
let elapsedTime: TimeInterval
}
enum OptimizationError: Error {
case cancelled
case riskLimitViolation([String])
case noSolutionFound
}
// MARK: - Part 2: Progress Monitoring Dashboard
// Global progress publisher
actor ProgressPublisher {
static let shared = ProgressPublisher()
private var subscribers: [UUID: AsyncStream.Continuation] = [:]
func publish(_ progress: OptimizationProgress) {
for continuation in subscribers.values {
continuation.yield(progress)
}
}
func subscribe() -> (UUID, AsyncStream) {
let id = UUID()
let stream = AsyncStream { continuation in
Task {
await addSubscriber(id: id, continuation: continuation)
}
}
return (id, stream)
}
private func addSubscriber(id: UUID, continuation: AsyncStream.Continuation) async {
subscribers[id] = continuation
}
func unsubscribe(id: UUID) {
subscribers[id]?.finish()
subscribers.removeValue(forKey: id)
}
}
// Dashboard view (SwiftUI)
@MainActor
class OptimizationViewModel: ObservableObject {
@Published var currentIteration = 0
@Published var bestValue: Double = 0
@Published var elapsedTime: TimeInterval = 0
@Published var isRunning = false
private var subscriberID: UUID?
private var optimizationTask: Task?
func startOptimization(optimizer: RealTimePortfolioOptimizer) async {
isRunning = true
// Subscribe to progress updates
let (id, stream) = await ProgressPublisher.shared.subscribe()
subscriberID = id
// Monitor progress
Task {
for await progress in stream {
self.currentIteration = progress.iteration
self.bestValue = progress.bestValue
self.elapsedTime = progress.elapsedTime
}
}
// Run optimization
optimizationTask = Task {
return try await optimizer.optimize()
}
}
func cancelOptimization(optimizer: RealTimePortfolioOptimizer) async {
await optimizer.cancel()
optimizationTask?.cancel()
if let id = subscriberID {
await ProgressPublisher.shared.unsubscribe(id: id)
}
isRunning = false
}
}
// MARK: - Demo Execution
// Portfolio constraints (simple struct for this demo)
struct PortfolioConstraintSet {
let minWeight: Double
let maxWeight: Double
let maxSectorConcentration: Double
static let standard = PortfolioConstraintSet(
minWeight: 0.0,
maxWeight: 0.25,
maxSectorConcentration: 0.40
)
}
// Demo: Run optimization with sample portfolio
Task {
print("π Setting up portfolio optimization demo...")
print(String(repeating: "=", count: 60))
// 1. Define sample portfolio (20 assets)
let numAssets = 20
let targetWeights = VectorN((0.. 0.01 { // Only show significant weights
print(" Asset \(i + 1): \(weight.percent(2))")
}
}
print("\nπ Convergence Summary:")
if let first = result.convergenceHistory.first,
let last = result.convergenceHistory.last {
print(" β’ Initial: \(first.value.number(6)) (iteration \(first.iteration))")
print(" β’ Final: \(last.value.number(6)) (iteration \(last.iteration))")
print(" β’ Improvement: \((first.value - last.value).number(6))")
}
print("\nβ
Demo complete!")
} catch {
print("\nβ Optimization failed: \(error)")
}
// 7. Finish playground execution (must run on main thread)
await MainActor.run {
PlaygroundPage.current.finishExecution()
}
}
struct TradeGenerator {
let currentHoldings: [String: Double] // Symbol β shares
let prices: [String: Double] // Symbol β price
let lotSize: Int = 100 // Trade in 100-share lots
func generateTrades(
from currentWeights: VectorN,
to targetWeights: VectorN,
symbols: [String],
portfolioValue: Double
) -> [Trade] {
var trades: [Trade] = []
for (i, symbol) in symbols.enumerated() {
let currentWeight = currentWeights[i]
let targetWeight = targetWeights[i]
let currentValue = portfolioValue * currentWeight
let targetValue = portfolioValue * targetWeight
let currentShares = currentHoldings[symbol] ?? 0
let targetShares = targetValue / prices[symbol]!
let deltaShares = targetShares - currentShares
// Round to lot size
let lots = Int((deltaShares / Double(lotSize)).rounded())
let tradedShares = Double(lots * lotSize)
if abs(tradedShares) >= Double(lotSize) {
let trade = Trade(
symbol: symbol,
side: tradedShares > 0 ? .buy : .sell,
shares: abs(tradedShares),
limitPrice: calculateLimitPrice(symbol: symbol, side: tradedShares > 0 ? .buy : .sell),
estimatedCost: abs(tradedShares) * prices[symbol]!
)
trades.append(trade)
}
}
return trades.sorted { $0.estimatedCost > $1.estimatedCost } // Largest first
}
private func calculateLimitPrice(symbol: String, side: TradeSide) -> Double {
let midPrice = prices[symbol]!
// Add/subtract half spread for limit order
let spread = midPrice * 0.001 // 10 bps spread
return side == .buy ? midPrice + spread / 2 : midPrice - spread / 2
}
}
struct Trade {
let symbol: String
let side: TradeSide
let shares: Double
let limitPrice: Double
let estimatedCost: Double
}
enum TradeSide {
case buy, sell
}
// MARK: - Extended Demo: Trade Generation and Performance Metrics
// Uncomment the code below to run a full portfolio rebalancing demo with trade generation
Task {
// Sample portfolio data (20 assets for demo)
let symbols = [
"AAPL", "MSFT", "GOOGL", "AMZN", "NVDA",
"META", "TSLA", "BRK.B", "UNH", "XOM",
"JNJ", "JPM", "V", "PG", "MA",
"HD", "CVX", "MRK", "ABBV", "PEP"
]
let numAssets = symbols.count
// Current market cap weights (target/benchmark)
let targetWeights = VectorN([
0.065, 0.060, 0.055, 0.050, 0.048,
0.046, 0.044, 0.042, 0.041, 0.040,
0.039, 0.038, 0.037, 0.036, 0.035,
0.034, 0.033, 0.032, 0.031, 0.030
])
// Current portfolio weights (drifted from target)
let currentWeights = VectorN([
0.070, 0.055, 0.060, 0.045, 0.052,
0.040, 0.050, 0.038, 0.043, 0.035,
0.042, 0.040, 0.034, 0.038, 0.032,
0.036, 0.030, 0.035, 0.028, 0.033
])
// Latest market prices
let latestPrices: [String: Double] = [
"AAPL": 182.45, "MSFT": 415.30, "GOOGL": 138.92, "AMZN": 151.94, "NVDA": 495.22,
"META": 487.47, "TSLA": 238.72, "BRK.B": 390.88, "UNH": 524.86, "XOM": 112.34,
"JNJ": 160.24, "JPM": 178.39, "V": 264.57, "PG": 158.36, "MA": 461.18,
"HD": 348.22, "CVX": 154.87, "MRK": 126.45, "ABBV": 169.32, "PEP": 173.21
]
// Current holdings (shares)
let portfolioValue = 250_000_000.0 // $250M portfolio
var currentHoldings: [String: Double] = [:]
for (i, symbol) in symbols.enumerated() {
let weight = currentWeights[i]
let value = portfolioValue * weight
let shares = value / latestPrices[symbol]!
currentHoldings[symbol] = shares
}
// Setup market data and risk monitor
let marketData = AsyncMarketDataStream()
await marketData.updatePrices(symbols.map { latestPrices[$0]! })
let riskMonitor = RiskMonitor()
// Run optimization
let optimizer = RealTimePortfolioOptimizer(
numAssets: numAssets,
targetWeights: targetWeights,
constraints: .standard,
marketData: marketData,
riskMonitor: riskMonitor
)
print("π Starting rebalancing optimization...")
let result = try await optimizer.optimize()
print("\n" + String(repeating: "β", count: 60))
print("β
REBALANCING OPTIMIZATION COMPLETE")
print(String(repeating: "β", count: 60))
print(" Elapsed Time: \(result.elapsedTime.number(2))s")
print(" Final Tracking Error: \((result.objectiveValue * 10_000).number(0))bps")
print(" Iterations: \(result.convergenceHistory.count)")
// Generate trades
let tradeGenerator = TradeGenerator(
currentHoldings: currentHoldings,
prices: latestPrices
)
let trades = tradeGenerator.generateTrades(
from: currentWeights,
to: result.weights,
symbols: symbols,
portfolioValue: portfolioValue
)
print("\nπ Generated \(trades.count) trades:")
let totalBuyValue = trades.filter { $0.side == .buy }.map(\.estimatedCost).reduce(0, +)
let totalSellValue = trades.filter { $0.side == .sell }.map(\.estimatedCost).reduce(0, +)
print(" Total Buy Value: \(totalBuyValue.currency())")
print(" Total Sell Value: \(totalSellValue.currency())")
print(" Net Turnover: \((totalBuyValue + totalSellValue).currency())")
print(" Estimated Costs: \(((totalBuyValue + totalSellValue) * 0.0001).currency()) (1bp)")
// Top 10 largest trades
print("\nπ Top 10 Largest Trades:")
for (idx, trade) in trades.prefix(10).enumerated() {
let action = trade.side == .buy ? "BUY " : "SELL"
print(" \(idx + 1). \(action) \(trade.shares.number(0)) shares of \(trade.symbol) @ \(trade.limitPrice.currency()) (value: \(trade.estimatedCost.currency()))")
}
// Weight comparison for assets with significant changes
print("\nπ Significant Weight Changes:")
for (i, symbol) in symbols.enumerated() {
let currentW = currentWeights[i]
let targetW = targetWeights[i]
let optimizedW = result.weights[i]
let change = (optimizedW - currentW) * 100
if abs(change) > 0.3 { // Show changes > 0.3%
let direction = change > 0 ? "β" : "β"
print(" \(symbol): \(currentW.percent(2)) β \(optimizedW.percent(2)) \(direction) (\(change > 0 ? "+" : "")\(change.number(2))%)")
}
}
await MainActor.run {
PlaygroundPage.current.finishExecution()
}
}
β Includes: Full async optimizer, progress monitoring, trade generationβ Extensions: Add ML-based price prediction, multi-day lookahead
Series: [Week 11 of 12] | Case Study [5/6] | Topics Combined: Async/Await + Streaming + Particle Swarm + Risk Monitoring
Next Week: Week 12 concludes with reflections (What Worked, What Didnβt, Final Statistics) and Case Study #6: Investment Strategy DSL using result builders.
Tagged with: businessmath, swift, case-study, async-await, real-time, portfolio-rebalancing, streaming, concurrency, actors