Reactive Messaging Patterns with F# and Akka.NET


Message Construction

For more details and full analysis of each pattern described in this section, please refer to Chapter 6 of Reactive Messaging Patterns with the Actor Model by Vaughn Vernon.

Sections

  1. Introduction
  2. Messaging with Actors
  3. Messaging Channels
  4. Message Construction
  5. Message Routing
  6. Message Transformation
  7. Message Endpoints
  8. System Management and Infrastructure

Command Message

A Command is a type of message sent to an actor to request an action.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
type TradingCommand =
    | ExecuteBuyOrder of portfolioId: string * symbol: string * quantity: int * price: Money
    | ExecuteSellOrder of portfolioId: string * symbol: string * quantity: int * price: Money

let stockTrader (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | ExecuteBuyOrder(portfolioId, symbol, quantity, price) as buy -> 
            printfn "StockTrader: buying for: %A" buy
        | ExecuteSellOrder(portfolioId, symbol, quantity, price) as sell ->
            printfn "StockTrader: selling for: %A" sell
        return! loop () 
    }
    loop ()

let stockTraderRef = spawn system "stockTrader" <| stockTrader

stockTraderRef <! ExecuteBuyOrder("p123", "MSFT", 100, Money 31.85m)
stockTraderRef <! ExecuteSellOrder("p456", "MSFT", 200, Money 31.80m)
Complete Code

Sections

Document Message

A Document is a type of message that carries information without a specific intended use.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
type PriceQuote = { QuoterId: string; RetailerId: string; RfqId: string; ItemId: string; RetailPrice: decimal; DiscountPrice: decimal }
type QuotationFulfillment = { RfqId: string; QuotesRequested: int; PriceQuotes: PriceQuote seq; Requester: IActorRef }
type RequestPriceQuote = RequestPriceQuote of rfqId: string * itemId: string * retailPrice: Money * orderTotalRetailPrice: Money

let quotation (mailbox: Actor<_>) =
    let quoterId = mailbox.Self.Path.Name
    let rec loop () = actor {
        let! RequestPriceQuote(rfqId, itemId, Money retailPrice, _) = mailbox.Receive ()
        mailbox.Sender () <! { RfqId = rfqId; QuotesRequested = 1; PriceQuotes = [{ QuoterId = quoterId; RetailerId = "Retailer1"; RfqId = rfqId; ItemId = itemId; RetailPrice = retailPrice; DiscountPrice = retailPrice * 0.90m }]; Requester = mailbox.Sender () }
        return! loop ()
    }
    loop ()

let requester quotation (mailbox: Actor<_>) =
    quotation <! RequestPriceQuote("1", "1", Money 10m, Money 10m)
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        printfn "Requester: quote %A" message
        return! loop ()
    }
    loop ()

let quotationRef = spawn system "quotation" quotation
let requesterRef = spawn system "requester" <| requester quotationRef
Complete Code

Sections

Event Message

An Event is a type of message sent to notify other actors about actions that occurred.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
type PriceQuote = { QuoterId: string; RetailerId: string; RfqId: string; ItemId: string; RetailPrice: decimal; DiscountPrice: decimal }
type RequestPriceQuote = RequestPriceQuote of rfqId: string * itemId: string * retailPrice: Money * orderTotalRetailPrice: Money
type PriceQuoteFulfilled = PriceQuoteFulfilled of priceQuote: PriceQuote 

let quotation subscriber (mailbox: Actor<_>) =
    let quoterId = mailbox.Self.Path.Name
    let rec loop () = actor {
        let! RequestPriceQuote(rfqId, itemId, Money retailPrice, _) = mailbox.Receive ()
        subscriber <! PriceQuoteFulfilled { QuoterId = quoterId; RetailerId = "Retailer1"; RfqId = rfqId; ItemId = itemId; RetailPrice = retailPrice; DiscountPrice = retailPrice * 0.90m }
        return! loop ()
    }
    loop ()

let subscriber (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        printfn "Requester: event %A" message
        return! loop ()
    }
    loop ()

let subscriberRef = spawn system "subscriber" subscriber
let quotationRef = spawn system "quotation" <| quotation subscriberRef

quotationRef <! RequestPriceQuote("1", "1", Money 10m, Money 10m)
Complete Code

Sections

Request-Reply

This pattern emulates two-way communication using two different channels. Most of the time the Request message is a Command and the Reply message is a Document.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
type ServerMessage = Request of string
type ClientMessage =
    | Reply of string
    | StartWith of IActorRef

let client (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | StartWith server ->
            printfn "Client: is starting..."
            server <! Request "REQ-1"
        | Reply what -> printfn "Client: received response: %s" what
        return! loop ()
    }
    loop ()

let server (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! Request what = mailbox.Receive ()
        printfn "Server: received request value: %s" what
        mailbox.Sender () <! Reply (sprintf "RESP-1 for %s" what)
        return! loop ()
    }
    loop ()

let clientRef = spawn system "client" client
let serverRef = spawn system "server" server

clientRef <! StartWith serverRef
Complete Code

Sections

Return Address

The Return Address pattern allows replying to an actor that is not the actual sender of the message.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
type ServerMessage = 
    | Request of string
    | RequestComplex of string
type ClientMessage =
    | Reply of string
    | StartWith of IActorRef
    | ReplyToComplex of string
type WorkerMessage =
    | WorkerRequestComplex of string

let client (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | StartWith server ->
            printfn "Client: is starting..."
            server <! Request "REQ-1"
            server <! RequestComplex "REQ-20"
        | Reply what -> printfn "Client: received response: %s" what
        | ReplyToComplex what -> printfn "Client: received reply to complex: %s" what
        return! loop ()
    }
    loop ()

let worker (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! WorkerRequestComplex what = mailbox.Receive ()
        printfn "Worker: received complex request value: %s" what
        mailbox.Sender () <! ReplyToComplex (sprintf "RESP-2000 for %s" what)
        return! loop ()
    }
    loop ()

let server (mailbox: Actor<_>) =
    let workerRef = spawn mailbox.Context "worker" worker
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | Request what -> 
            printfn "Server: received request value: %s" what
            mailbox.Sender () <! Reply (sprintf "RESP-1 for %s" what)
        | RequestComplex what -> 
            printfn "Server: received request value: %s" what
            mailbox.Sender () <! Reply (sprintf "RESP-1 for %s" what)
            workerRef.Forward <| WorkerRequestComplex what
        return! loop ()
    }
    loop ()

let clientRef = spawn system "client" client
let serverRef = spawn system "server" server

clientRef <! StartWith serverRef
Complete Code

Sections

Correlation Identifier

This pattern allows messages to be associated using a unique identifier, specifying that they are related in some way (e.g. Request-Reply).

1: 
2: 
3: 
4: 
5: 
6: 
type PriceQuote = { QuoterId: string; RetailerId: string; RfqId: string; ItemId: string; RetailPrice: decimal; DiscountPrice: decimal }
type RequestPriceQuote = RequestPriceQuote of rfqId: string * itemId: string * retailPrice: decimal * orderTotalRetailPrice: decimal
type PriceQuoteTimedOut = PriceQuoteTimedOut of rfqId: string
type RequiredPriceQuotesForFulfillment = RequiredPriceQuotesForFulfillment of rfqId: string * quotesRequested: int
type QuotationFulfillment = QuotationFulfillment of rfqId: string * quotesRequested: int * priceQuotes: PriceQuote seq * requester: IActorRef
type BestPriceQuotation = BestPriceQuotation of rfqId: string * priceQuotes: PriceQuote seq
Complete Code

Sections

Message Sequence

The Message Sequence pattern establishes the logical order in which messages should be delivered.

1: 
// No code example

Sections

Message Expiration

This pattern allows to determine whether a message is obsolete or not.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
type PlaceOrder = { Id: string; OccurredOn: int64; TimeToLive: int64; ItemId: string; Price: Money } with
    static member Create (itemId, id, price, timeToLive) = { Id = id; OccurredOn = currentTimeMillis (); TimeToLive = timeToLive; ItemId = itemId; Price = price }

let inline isExpired (message: ^a) = 
    let occurredOn = (^a: (member get_OccurredOn: unit -> int64) message)
    let timeToLive = (^a: (member get_TimeToLive: unit -> int64) message)
    let elapsed = currentTimeMillis () - occurredOn
    elapsed > timeToLive

let purchaseRouter purchaseAgent (mailbox: Actor<_>) =
    let random = Random ()
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        let millis = random.Next 100 + 1
        printfn "PurchaseRouter: delaying delivery of %A for %i milliseconds" message millis
        let duration = TimeSpan.FromMilliseconds <| float millis
        mailbox.Context.System.Scheduler.ScheduleTellOnce (duration, purchaseAgent, message)
        return! loop ()
    }
    loop ()

let purchaseAgent (mailbox: Actor<PlaceOrder>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        if isExpired message then
            mailbox.Context.System.DeadLetters <! message
            printfn "PurchaseAgent: delivered expired %A to dead letters" message
        else
            printfn "PurchaseAgent: placing order for %A" message
        return! loop ()
    }
    loop ()

let purchaseAgentRef = spawn system "purchaseAgent" purchaseAgent
let purchaseRouterRef = spawn system "purchaseRouter" <| purchaseRouter purchaseAgentRef

purchaseRouterRef <! PlaceOrder.Create ("1", "11", (Money 50.00m), 1000L)
purchaseRouterRef <! PlaceOrder.Create ("2", "22", (Money 250.00m), 100L)
purchaseRouterRef <! PlaceOrder.Create ("3", "33", (Money 32.95m), 10L)
Complete Code

Sections

Format Indicator

A Format Indicator is used to specify the version of one particular message.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
type ExecuteBuyOrder = { PortfolioId: string; Symbol: string; Quantity: int; Price: Money; DateTimeOrdered: DateTimeOffset option; Version: int } with
    static member CreateV1 (portfolioId, symbol, quantity, price) = { PortfolioId = portfolioId; Symbol = symbol; Quantity = quantity; Price = price; DateTimeOrdered = None; Version = 1 } 
    static member CreateV2 (portfolioId, symbol, quantity, price) = { PortfolioId = portfolioId; Symbol = symbol; Quantity = quantity; Price = price; DateTimeOrdered = Some DateTimeOffset.UtcNow; Version = 2 } 

let stockTrader (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        let orderExecutionStartedOn = match message.Version with
                                      | 1 -> Some DateTimeOffset.UtcNow
                                      | _ -> message.DateTimeOrdered
        printfn "StockTrader: orderExecutionStartedOn: %A" orderExecutionStartedOn
        return! loop () 
    }
    loop ()

let stockTraderRef = spawn system "stockTrader" stockTrader
stockTraderRef <! ExecuteBuyOrder.CreateV1 ("1", "11", 10, (Money 50.00m))
stockTraderRef <! ExecuteBuyOrder.CreateV2 ("1", "11", 10, (Money 50.00m))
Complete Code

Sections

type TradingCommand =
  | ExecuteBuyOrder of portfolioId: string * symbol: string * quantity: int * price: obj
  | ExecuteSellOrder of portfolioId: string * symbol: string * quantity: int * price: obj

Full name: messageconstruction.TradingCommand
union case TradingCommand.ExecuteBuyOrder: portfolioId: string * symbol: string * quantity: int * price: obj -> TradingCommand
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
union case TradingCommand.ExecuteSellOrder: portfolioId: string * symbol: string * quantity: int * price: obj -> TradingCommand
val stockTrader : mailbox:'a -> 'b

Full name: messageconstruction.stockTrader
val mailbox : 'a
val loop : (unit -> 'a)
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val stockTraderRef : obj

Full name: messageconstruction.stockTraderRef
type PriceQuote =
  {QuoterId: string;
   RetailerId: string;
   RfqId: string;
   ItemId: string;
   RetailPrice: decimal;
   DiscountPrice: decimal;}

Full name: messageconstruction.PriceQuote
PriceQuote.QuoterId: string
PriceQuote.RetailerId: string
PriceQuote.RfqId: string
PriceQuote.ItemId: string
PriceQuote.RetailPrice: decimal
Multiple items
val decimal : value:'T -> decimal (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.decimal

--------------------
type decimal = System.Decimal

Full name: Microsoft.FSharp.Core.decimal

--------------------
type decimal<'Measure> = decimal

Full name: Microsoft.FSharp.Core.decimal<_>
PriceQuote.DiscountPrice: decimal
type QuotationFulfillment =
  {RfqId: string;
   QuotesRequested: int;
   PriceQuotes: seq<PriceQuote>;
   Requester: obj;}

Full name: messageconstruction.QuotationFulfillment
QuotationFulfillment.RfqId: string
QuotationFulfillment.QuotesRequested: int
QuotationFulfillment.PriceQuotes: seq<PriceQuote>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
QuotationFulfillment.Requester: obj
Multiple items
union case RequestPriceQuote.RequestPriceQuote: rfqId: string * itemId: string * retailPrice: obj * orderTotalRetailPrice: obj -> RequestPriceQuote

--------------------
type RequestPriceQuote = | RequestPriceQuote of rfqId: string * itemId: string * retailPrice: obj * orderTotalRetailPrice: obj

Full name: messageconstruction.RequestPriceQuote
val quotation : mailbox:'a -> 'b

Full name: messageconstruction.quotation
val quoterId : 'a
val requester : quotation:'a -> mailbox:'b -> 'c (requires member ( <! ))

Full name: messageconstruction.requester
val quotation : 'a (requires member ( <! ))
val quotationRef : 'a (requires member ( <! ))

Full name: messageconstruction.quotationRef
val requesterRef : 'a

Full name: messageconstruction.requesterRef
Multiple items
union case PriceQuoteFulfilled.PriceQuoteFulfilled: priceQuote: PriceQuote -> PriceQuoteFulfilled

--------------------
type PriceQuoteFulfilled = | PriceQuoteFulfilled of priceQuote: PriceQuote

Full name: messageconstruction.PriceQuoteFulfilled
val quotation : subscriber:'a -> mailbox:'b -> 'c

Full name: messageconstruction.quotation
val subscriber : 'a
val subscriber : mailbox:'a -> 'b

Full name: messageconstruction.subscriber
val subscriberRef : 'a

Full name: messageconstruction.subscriberRef
type ServerMessage = | Request of string

Full name: messageconstruction.ServerMessage
union case ServerMessage.Request: string -> ServerMessage
type ClientMessage =
  | Reply of string
  | StartWith of obj

Full name: messageconstruction.ClientMessage
union case ClientMessage.Reply: string -> ClientMessage
union case ClientMessage.StartWith: obj -> ClientMessage
val client : mailbox:'a -> 'b

Full name: messageconstruction.client
val server : mailbox:'a -> 'b

Full name: messageconstruction.server
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val clientRef : 'a (requires member ( <! ))

Full name: messageconstruction.clientRef
val serverRef : 'a

Full name: messageconstruction.serverRef
type WorkerMessage = | WorkerRequestComplex of string

Full name: messageconstruction.WorkerMessage
union case WorkerMessage.WorkerRequestComplex: string -> WorkerMessage
val worker : mailbox:'a -> 'b

Full name: messageconstruction.worker
val workerRef : 'a
Multiple items
union case PriceQuoteTimedOut.PriceQuoteTimedOut: rfqId: string -> PriceQuoteTimedOut

--------------------
type PriceQuoteTimedOut = | PriceQuoteTimedOut of rfqId: string

Full name: messageconstruction.PriceQuoteTimedOut
Multiple items
union case RequiredPriceQuotesForFulfillment.RequiredPriceQuotesForFulfillment: rfqId: string * quotesRequested: int -> RequiredPriceQuotesForFulfillment

--------------------
type RequiredPriceQuotesForFulfillment = | RequiredPriceQuotesForFulfillment of rfqId: string * quotesRequested: int

Full name: messageconstruction.RequiredPriceQuotesForFulfillment
Multiple items
union case BestPriceQuotation.BestPriceQuotation: rfqId: string * priceQuotes: seq<PriceQuote> -> BestPriceQuotation

--------------------
type BestPriceQuotation = | BestPriceQuotation of rfqId: string * priceQuotes: seq<PriceQuote>

Full name: messageconstruction.BestPriceQuotation
type PlaceOrder =
  {Id: string;
   OccurredOn: int64;
   TimeToLive: int64;
   ItemId: string;
   Price: obj;}
  static member Create : itemId:string * id:string * price:'a0 * timeToLive:int64 -> PlaceOrder

Full name: messageconstruction.PlaceOrder
PlaceOrder.Id: string
PlaceOrder.OccurredOn: int64
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = System.Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
PlaceOrder.TimeToLive: int64
PlaceOrder.ItemId: string
PlaceOrder.Price: obj
static member PlaceOrder.Create : itemId:string * id:string * price:'a0 * timeToLive:int64 -> PlaceOrder

Full name: messageconstruction.PlaceOrder.Create
val itemId : string
val id : string
val price : 'a
val timeToLive : int64
val isExpired : message:'a -> bool (requires member get_OccurredOn and member get_TimeToLive)

Full name: messageconstruction.isExpired
val message : 'a (requires member get_OccurredOn and member get_TimeToLive)
val occurredOn : int64
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val elapsed : int64
val purchaseRouter : purchaseAgent:'a -> mailbox:'b -> 'c

Full name: messageconstruction.purchaseRouter
val purchaseAgent : 'a
val random : 'a
Multiple items
val float : value:'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float = System.Double

Full name: Microsoft.FSharp.Core.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
namespace System
val purchaseAgent : mailbox:'a -> 'b

Full name: messageconstruction.purchaseAgent
val purchaseAgentRef : 'a

Full name: messageconstruction.purchaseAgentRef
val purchaseRouterRef : 'a (requires member ( <! ))

Full name: messageconstruction.purchaseRouterRef
static member PlaceOrder.Create : itemId:string * id:string * price:'a0 * timeToLive:int64 -> PlaceOrder
Multiple items
union case TradingCommand.ExecuteBuyOrder: portfolioId: string * symbol: string * quantity: int * price: obj -> TradingCommand

--------------------
type ExecuteBuyOrder =
  {PortfolioId: string;
   Symbol: string;
   Quantity: int;
   Price: obj;
   DateTimeOrdered: obj;
   Version: int;}
  static member CreateV1 : portfolioId:string * symbol:string * quantity:int * price:'a0 -> ExecuteBuyOrder
  static member CreateV2 : portfolioId:string * symbol:string * quantity:int * price:'a0 -> ExecuteBuyOrder

Full name: messageconstruction.ExecuteBuyOrder
ExecuteBuyOrder.PortfolioId: string
ExecuteBuyOrder.Symbol: string
ExecuteBuyOrder.Quantity: int
ExecuteBuyOrder.Price: obj
ExecuteBuyOrder.DateTimeOrdered: obj
type 'T option = Option<'T>

Full name: Microsoft.FSharp.Core.option<_>
ExecuteBuyOrder.Version: int
static member ExecuteBuyOrder.CreateV1 : portfolioId:string * symbol:string * quantity:int * price:'a0 -> ExecuteBuyOrder

Full name: messageconstruction.ExecuteBuyOrder.CreateV1
val portfolioId : string
val symbol : string
val quantity : int
union case Option.None: Option<'T>
static member ExecuteBuyOrder.CreateV2 : portfolioId:string * symbol:string * quantity:int * price:'a0 -> ExecuteBuyOrder

Full name: messageconstruction.ExecuteBuyOrder.CreateV2
union case Option.Some: Value: 'T -> Option<'T>
val stockTraderRef : 'a (requires member ( <! ))

Full name: messageconstruction.stockTraderRef
static member ExecuteBuyOrder.CreateV1 : portfolioId:string * symbol:string * quantity:int * price:'a0 -> ExecuteBuyOrder
static member ExecuteBuyOrder.CreateV2 : portfolioId:string * symbol:string * quantity:int * price:'a0 -> ExecuteBuyOrder
F# Project
Fork me on GitHub