Reactive Messaging Patterns with F# and Akka.NET


Messaging with Actors

For more details and full analysis of each pattern described in this section, please refer to Chapter 4 of Reactive Messaging Patterns with the Actor Model by Vaughn Vernon.

Sections

  1. Introduction
  2. Messaging with Actors
  3. Messaging Channels
  4. Message Construction
  5. Message Routing
  6. Message Transformation
  7. Message Endpoints
  8. System Management and Infrastructure

Message Channel

The Message Channel pattern enables communication between a consumer and a producer. When using Akka.NET, this concept is represented by the actor's mailbox.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
type ProcessorMessage = ProcessJob of int * int * int

let processor (mailbox: Actor<_>) = 
    let rec loop () = actor {
        let! ProcessJob(x,y,z) = mailbox.Receive ()
        printfn "Processor: received ProcessJob %i %i %i" x y z
        return! loop ()
    }
    loop ()

let processorRef = spawn system "processor" processor

processorRef <! ProcessJob(1, 3, 5)
Complete Code

Sections

Message

Messages are sent between actors to exchange information.

Primitive Types

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
let scalarValuePrinter (mailbox: Actor<_>) = 
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match box message with
        | :? string as msg -> printfn "ScalarValuePrinter: received String %s" msg
        | :? int as msg -> printfn "ScalarValuePrinter: received Int %i" msg
        | _ -> ()
        return! loop ()
    }
    loop ()

Discriminated Unions

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
type OrderProcessorCommand =
    | ExecuteBuyOrder of portfolioId: string * symbol: Symbol * quantity: int * price: Money
    | ExecuteSellOrder of portfolioId: string * symbol: Symbol * quantity: int * price: Money
type OrderProcessorEvent =
    | BuyOrderExecuted of portfolioId: string * symbol: Symbol * quantity: int * price: Money
    | SellOrderExecuted of portfolioId: string * symbol: Symbol * quantity: int * price: Money

let orderProcessor (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | ExecuteBuyOrder(i, s, q, p) -> mailbox.Sender () <! BuyOrderExecuted(i, s, q, p)
        | ExecuteSellOrder(i, s, q, p) -> mailbox.Sender () <! SellOrderExecuted(i ,s, q, p)
        return! loop ()
    }
    loop ()
Complete Code

Sections

Pipes and Filters

This pattern allows to chain actors together while keeping them independent.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
let orderAcceptanceEndpoint nextFilter (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        let text = Encoding.Default.GetString message
        printfn "OrderAcceptanceEndpoint: processing %s" text
        nextFilter <! ProcessIncomingOrder(message)
        return! loop ()
    }
    loop ()

let decrypter nextFilter (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! ProcessIncomingOrder(bytes) = mailbox.Receive ()
        let text = Encoding.Default.GetString bytes
        printfn "Decrypter: processing %s" text
        let orderText = text.Replace ("(encryption)", String.Empty)
        nextFilter <! ProcessIncomingOrder(Encoding.Default.GetBytes orderText)
        return! loop ()
    }
    loop ()

let authenticator nextFilter (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! ProcessIncomingOrder(bytes) = mailbox.Receive ()
        let text = Encoding.Default.GetString bytes
        printfn "Authenticator: processing %s" text
        let orderText = text.Replace ("(certificate)", String.Empty)
        nextFilter <! ProcessIncomingOrder(Encoding.Default.GetBytes orderText)
        return! loop ()
    }
    loop ()

let deduplicator nextFilter (mailbox: Actor<_>) =
    let orderIdFrom (orderText: string) =
        let orderIdIndex = orderText.IndexOf ("id='") + 4
        let orderIdLastIndex = orderText.IndexOf ("'", orderIdIndex)
        orderText.Substring (orderIdIndex, orderIdLastIndex)

    let rec loop (processedOrderIds: string Set) = actor {
        let! ProcessIncomingOrder(bytes) = mailbox.Receive ()
        let text = Encoding.Default.GetString bytes
        printfn "Deduplicator: processing %s" text
        let orderId = orderIdFrom text
        if (not <| Set.contains orderId processedOrderIds) then 
            nextFilter <! ProcessIncomingOrder(bytes) 
            return! loop <| Set.add orderId processedOrderIds
        else 
            printfn "Deduplicator: found duplicate order %s" orderId
            return! loop processedOrderIds
    }
    loop Set.empty

let orderManagerSystem (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! ProcessIncomingOrder(bytes) = mailbox.Receive ()
        let text = Encoding.Default.GetString bytes
        printfn "OrderManagementSystem: processing unique order: %s" text
        return! loop ()
    }
    loop ()

let orderText = "(encryption)(certificate)<order id='123'>...</order>"
let rawOrderBytes = Encoding.Default.GetBytes orderText

let filter5 = spawn system "orderManagementSystem" orderManagerSystem
let filter4 = spawn system "deduplicator" <| deduplicator filter5
let filter3 = spawn system "authenticator" <| authenticator filter4
let filter2 = spawn system "decrypter" <| decrypter filter3
let filter1 = spawn system "orderAcceptanceEndpoint" <| orderAcceptanceEndpoint filter2

filter1 <! rawOrderBytes
Complete Code

Sections

Message Router

The Message Router enables message redirection depending on certain conditions.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
let alternatingRouter (processor1: IActorRef) (processor2: IActorRef) (mailbox: Actor<_>) =
    let rec loop alternate = actor {
        let alternateProcessor () = if alternate = 1 then processor1, 2 else processor2, 1
        let! message = mailbox.Receive ()
        let processor, nextAlternate = alternateProcessor ()
        printfn "AlternatingRouter: routing %O to %s" message processor.Path.Name
        processor <! message
        return! loop nextAlternate
    }
    loop 1
Complete Code

Sections

Message Translator

This pattern facilitates message translation betwwen different data formats.

1: 
// No code example

Sections

Message Endpoint

The Message Endpoint represents a sender or receiver of a message. When using Akka.NET they are actors.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
let highSierraPriceQuotes discounter (mailbox: Actor<_>) =
    let quoterId = mailbox.Self.Path.Name
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | RequestPriceQuote(retailerId, rfqId, itemId) -> 
            printfn "HighSierraPriceQuotes: RequestPriceQuote received" 
            discounter <! CalculatedDiscountPriceFor(mailbox.Sender (), retailerId, rfqId, itemId)
        | DiscountPriceCalculated(requestedBy, retailerId, rfqId, itemId, retailPrice, discountPrice) -> 
            printfn "HighSierraPriceQuotes: DiscountPriceCalculated received" 
            requestedBy <! PriceQuote(quoterId, retailerId, rfqId, itemId, retailPrice, discountPrice)
        return! loop ()
    }
    loop ()

let discounter (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! CalculatedDiscountPriceFor(requester, retailerId, rfqId, itemId) = mailbox.Receive ()
        printfn "Discounter: CalculatedDiscountPriceFor received" 
        mailbox.Sender () <! DiscountPriceCalculated(requester, retailerId, rfqId, itemId, 100m, 89.99m)
        return! loop ()
    }
    loop ()

let requester quotes (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! PriceQuote(_, _, _, _,retailPrice, discountPrice) = mailbox.Receive ()
        printfn "Requester: PriceQuote received, retailPrice: %M, discountPrice %M" retailPrice discountPrice 
        return! loop ()
    }
    quotes <! RequestPriceQuote("retailer1", "rfq1", "item1")
    loop ()

let discounterRef = spawn system "discounter" discounter
let highSierraPriceQuotesRef = spawn system "highSierraPriceQuotes" <| highSierraPriceQuotes discounterRef
let requesterRef = spawn system "requester" <| requester highSierraPriceQuotesRef
Complete Code

Sections

type ProcessorMessage = | ProcessJob of int * int * int

Full name: messagingwithactors.ProcessorMessage
union case ProcessorMessage.ProcessJob: int * int * int -> ProcessorMessage
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val processor : mailbox:'a -> 'b

Full name: messagingwithactors.processor
val mailbox : 'a
val loop : (unit -> 'a)
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val processorRef : obj

Full name: messagingwithactors.processorRef
val scalarValuePrinter : mailbox:'a -> 'b

Full name: messagingwithactors.scalarValuePrinter
val box : value:'T -> obj

Full name: Microsoft.FSharp.Core.Operators.box
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
type OrderProcessorCommand =
  | ExecuteBuyOrder of portfolioId: string * symbol: obj * quantity: int * price: obj
  | ExecuteSellOrder of portfolioId: string * symbol: obj * quantity: int * price: obj

Full name: messagingwithactors.OrderProcessorCommand
union case OrderProcessorCommand.ExecuteBuyOrder: portfolioId: string * symbol: obj * quantity: int * price: obj -> OrderProcessorCommand
union case OrderProcessorCommand.ExecuteSellOrder: portfolioId: string * symbol: obj * quantity: int * price: obj -> OrderProcessorCommand
type OrderProcessorEvent =
  | BuyOrderExecuted of portfolioId: string * symbol: obj * quantity: int * price: obj
  | SellOrderExecuted of portfolioId: string * symbol: obj * quantity: int * price: obj

Full name: messagingwithactors.OrderProcessorEvent
union case OrderProcessorEvent.BuyOrderExecuted: portfolioId: string * symbol: obj * quantity: int * price: obj -> OrderProcessorEvent
union case OrderProcessorEvent.SellOrderExecuted: portfolioId: string * symbol: obj * quantity: int * price: obj -> OrderProcessorEvent
val orderProcessor : mailbox:'a -> 'b

Full name: messagingwithactors.orderProcessor
val orderAcceptanceEndpoint : nextFilter:'a -> mailbox:'b -> 'c

Full name: messagingwithactors.orderAcceptanceEndpoint
val nextFilter : 'a
val decrypter : nextFilter:'a -> mailbox:'b -> 'c

Full name: messagingwithactors.decrypter
module String

from Microsoft.FSharp.Core
val authenticator : nextFilter:'a -> mailbox:'b -> 'c

Full name: messagingwithactors.authenticator
val deduplicator : nextFilter:'a -> mailbox:'b -> 'c

Full name: messagingwithactors.deduplicator
val orderIdFrom : (string -> string)
val orderText : string
val orderIdIndex : int
System.String.IndexOf(value: string) : int
System.String.IndexOf(value: char) : int
System.String.IndexOf(value: string, comparisonType: System.StringComparison) : int
System.String.IndexOf(value: string, startIndex: int) : int
System.String.IndexOf(value: char, startIndex: int) : int
System.String.IndexOf(value: string, startIndex: int, comparisonType: System.StringComparison) : int
System.String.IndexOf(value: string, startIndex: int, count: int) : int
System.String.IndexOf(value: char, startIndex: int, count: int) : int
System.String.IndexOf(value: string, startIndex: int, count: int, comparisonType: System.StringComparison) : int
val orderIdLastIndex : int
System.String.Substring(startIndex: int) : string
System.String.Substring(startIndex: int, length: int) : string
val loop : (Set<string> -> 'a)
val processedOrderIds : Set<string>
Multiple items
module Set

from Microsoft.FSharp.Collections

--------------------
type Set<'T (requires comparison)> =
  interface IComparable
  interface IEnumerable
  interface IEnumerable<'T>
  interface ICollection<'T>
  new : elements:seq<'T> -> Set<'T>
  member Add : value:'T -> Set<'T>
  member Contains : value:'T -> bool
  override Equals : obj -> bool
  member IsProperSubsetOf : otherSet:Set<'T> -> bool
  member IsProperSupersetOf : otherSet:Set<'T> -> bool
  ...

Full name: Microsoft.FSharp.Collections.Set<_>

--------------------
new : elements:seq<'T> -> Set<'T>
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
val contains : element:'T -> set:Set<'T> -> bool (requires comparison)

Full name: Microsoft.FSharp.Collections.Set.contains
val add : value:'T -> set:Set<'T> -> Set<'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Set.add
val empty<'T (requires comparison)> : Set<'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Set.empty
val orderManagerSystem : mailbox:'a -> 'b

Full name: messagingwithactors.orderManagerSystem
val orderText : string

Full name: messagingwithactors.orderText
val rawOrderBytes : obj

Full name: messagingwithactors.rawOrderBytes
val filter5 : 'a

Full name: messagingwithactors.filter5
val filter4 : 'a

Full name: messagingwithactors.filter4
val filter3 : 'a

Full name: messagingwithactors.filter3
val filter2 : 'a

Full name: messagingwithactors.filter2
val filter1 : obj

Full name: messagingwithactors.filter1
val alternatingRouter : processor1:'a -> processor2:'b -> mailbox:'c -> 'd

Full name: messagingwithactors.alternatingRouter
val processor1 : 'a
val processor2 : 'a
val loop : ('a -> 'b)
val alternate : 'a
val highSierraPriceQuotes : discounter:'a -> mailbox:'b -> 'c

Full name: messagingwithactors.highSierraPriceQuotes
val discounter : 'a
val quoterId : 'a
val discounter : mailbox:'a -> 'b

Full name: messagingwithactors.discounter
val requester : quotes:'a -> mailbox:'c -> 'd (requires member ( <! ))

Full name: messagingwithactors.requester
val quotes : 'a (requires member ( <! ))
val discounterRef : 'a

Full name: messagingwithactors.discounterRef
val highSierraPriceQuotesRef : 'a (requires member ( <! ))

Full name: messagingwithactors.highSierraPriceQuotesRef
val requesterRef : 'a

Full name: messagingwithactors.requesterRef
F# Project
Fork me on GitHub