Skip to content

evolution-gaming/pekko-extension

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status Coverage Status Codacy Badge Version License: MIT

Pekko Extension libraries

Set of extension libraries for pekko.

Getting Started

All libraries require the same initial setup, like:

addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")

Setting dependency:

libraryDependencies += "com.evolution" %% "pekko-extension-<name>" % "<version>"

Extensions

pekko-extension-serialization

TODO add description!

pekko-extension-pubsub

Typesafe layer for DistributedPubSubMediator.

trait PubSub[F[_]] {

  def publish[A: Topic: ToBytes](
    msg: A,
    sender: Option[ActorRef] = None,
    sendToEachGroup: Boolean = false
  ): F[Unit]

  def subscribe[A: Topic: FromBytes: ClassTag](
    group: Option[String] = None)(
    onMsg: OnMsg[F, A]
  ): Resource[F, Unit]

  def topics(timeout: FiniteDuration = 3.seconds): F[Set[String]]
}

For an ability to serialize/deserialize messages to offload pekko remoting and improve throughput, check DistributedPubSubMediatorSerializing.scala.

set of pekko-extension-test libraries

These two libraries were created to provide a set of tests to be used in projects dependent on Pekko libraries. For instance, to prevent the following "surprise" at runtime:

java.lang.IllegalStateException: You are using version 1.2.0 of Pekko HTTP, but it appears you (perhaps indirectly) also depend on older versions of related artifacts. You can solve this by adding an explicit dependency on version 1.2.0 of the [pekko-http, pekko-http-testkit] artifacts to your project. Here's a complete collection of detected artifacts: (1.1.0, [pekko-http, pekko-http-testkit]), (1.2.0, [pekko-http-core, pekko-parsing]). See also: https://pekko.apache.org/docs/pekko/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed
	at org.apache.pekko.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:188)
	at org.apache.pekko.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:166)
	at org.apache.pekko.http.scaladsl.HttpExt.<init>(Http.scala:89)
	at org.apache.pekko.http.scaladsl.Http$.createExtension(Http.scala:1140)
	at org.apache.pekko.http.scaladsl.Http$.createExtension(Http.scala:871)
	at org.apache.pekko.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1175)
	at org.apache.pekko.actor.ExtensionId.apply(Extension.scala:87)
	at org.apache.pekko.actor.ExtensionId.apply$(Extension.scala:86)

pekko-extension-test-actor

For pekko-actor tests that all pekko modules are of the same version.

Set up the dependency in Test scope:

libraryDependencies += "com.evolution" %% "pekko-extension-test-actor" % "<version>" % Test

And add the following test into your project:

import com.evolution.pekkotest.PekkoActorSuite

class PekkoActorTest extends PekkoActorSuite

pekko-extension-test-http

For pekko-http tests that all pekko-http modules are of the same version.

Set up the dependency in Test scope:

libraryDependencies += "com.evolution" %% "pekko-extension-test-http" % "<version>" % Test

And add the following test into your project.

import com.evolution.pekkotest.PekkoHttpSuite

class PekkoHttpTest extends PekkoHttpSuite

pekko-extension-distributed-data-tools

SafeReplicator is a typesafe api for Distributed Data replicator

trait SafeReplicator[F[_], A <: ReplicatedData] {

  def get(implicit consistency: ReadConsistency): F[Option[A]]

  def update(modify: Option[A] => A)(implicit consistency: WriteConsistency): F[Unit]

  def delete(implicit consistency: WriteConsistency): F[Boolean]

  def subscribe(
    onStop: F[Unit],
    onChanged: A => F[Unit])(implicit
    factory: ActorRefFactory,
    executor: ExecutionContext
  ): Resource[F, Unit]

  def flushChanges: F[Unit]
}

pekko-extension-sharding-strategy

Alternative to org.apache.pekko.cluster.sharding.ShardCoordinator.ShardAllocationStrategy.

Api

trait ShardingStrategy[F[_]] {

  def allocate(requester: Region, shard: Shard, current: Allocation): F[Option[Region]]

  def rebalance(current: Allocation, inProgress: Set[Shard]): F[List[Shard]]
}

Syntax

val strategy = LeastShardsStrategy()
  .filterShards(...)
  .filterRegions(...)
  .rebalanceThreshold(10)
  .takeShards(10) 
  .shardRebalanceCooldown(1.minute)
  .logging(...)
  .toAllocationStrategy()

set of pekko-extension-tools libraries

pekko-extension-tools-test

TODO add description!

pekko-extension-tools-util

TODO add description!

pekko-extension-tools-serialization

TODO add description!

pekko-extension-tools-persistence

TODO add description!

pekko-extension-tools-cluster

TODO add description!

pekko-extension-tools-instrumentation

TODO add description!

TODO do we need umbrella lib pekko-extension-tools?

pekko-extension-conhub

ConHub is a distributed registry used to manage websocket connections on the different nodes of an application. It enables you to send a serializable message to one or many connections hiding away the complexity of distributed system. In short: user provides lookup criteria and a message and conHub does the job routing message to physical instances of a matched connections

Usage example:

type Connection = ??? // type representing physical connection
final case class Msg(bytes: Array[Byte]) // serializable
final case class Envelope(lookup: LookupById, msg: Msg)
final case class LookupById(id: String)
val conHub: ConHub[String, LookupById, Connection, Envelope] = ???
conHub ! Envelope(LookupById("testId"), Msg(Array(…)))

set of pekko-extension-effect libraries

This project aims to build a bridge between Pekko and pure functional code based on cats-effect.

pekko-extension-effect-actor

Covered ("classic", not the "typed" kind of actors!):

Represents ActorRef.tell:

trait Tell[F[_], -A] {
  def apply(a: A, sender: Option[ActorRef] = None): F[Unit]
}

Represents ActorRef.ask pattern:

trait Ask[F[_], -A, B] {
  def apply(msg: A, timeout: FiniteDuration, sender: Option[ActorRef]): F[B]
}

Represents a reply pattern: sender() ! reply:

trait Reply[F[_], -A] {
  def apply(msg: A): F[Unit]
}

This is what you need to implement instead of familiar new Actor { ... }:

trait Receive[F[_], -A, B] {
  def apply(msg: A): F[B]
  def timeout:  F[B]
}

Constructs Actor.scala out of receive: ActorCtx[F] => Resource[F, Receive[F, Any]].

Wraps ActorContext:

trait ActorCtx[F[_]] {
  def self: ActorRef
  def parent: ActorRef
  def executor: ExecutionContextExecutor
  def setReceiveTimeout(timeout: Duration): F[Unit]
  def child(name: String): F[Option[ActorRef]]
  def children: F[List[ActorRef]]
  def actorRefFactory: ActorRefFactory
  def watch[A](actorRef: ActorRef, msg: A): F[Unit]
  def unwatch(actorRef: ActorRef): F[Unit]
  def stop: F[Unit]
}

pekko-extension-effect-persistence

Constructs PersistentActor.scala out of eventSourcedOf: ActorCtx[F] => F[EventSourced[F, S, E, C]]

Describes a lifecycle of entity with regard to event sourcing, phases are: Started, Recovering, Receiving and Termination

trait EventSourced[F[_], S, E, C] {
  def eventSourcedId: EventSourcedId
  def recovery: Recovery
  def pluginIds: PluginIds
  def start: Resource[F, RecoveryStarted[F, S, E, C]]
}

Describes the start of the recovery phase

trait RecoveryStarted[F[_], S, E, C] {
  def apply(
    seqNr: SeqNr,
    snapshotOffer: Option[SnapshotOffer[S]]
  ): Resource[F, Recovering[F, S, E, C]]
}

Describes recovery phase

trait Recovering[F[_], S, E, C] {
  def replay: Resource[F, Replay[F, E]]

  def completed(
    seqNr: SeqNr,
    journaller: Journaller[F, E],
    snapshotter: Snapshotter[F, S]
  ): Resource[F, Receive[F, C]]
}

Used during recovery to replay events

trait Replay[F[_], A] {
  def apply(seqNr: SeqNr, event: A): F[Unit]
}

Describes communication with underlying journal

trait Journaller[F[_], -A] {
  def append: Append[F, A]
  def deleteTo: DeleteEventsTo[F]
}

Describes communication with underlying snapshot storage

/**
  * Describes communication with underlying snapshot storage
  *
  * @tparam A - snapshot
  */
trait Snapshotter[F[_], -A] {
  def save(seqNr: SeqNr, snapshot: A): F[F[Instant]]
  def delete(seqNr: SeqNr): F[F[Unit]]
  def delete(criteria: SnapshotSelectionCriteria): F[F[Unit]]
}

pekko-extension-effect-testkit

TODO add description!

pekko-extension-effect-actor-tests

TODO add description!

pekko-extension-effect-persistence-api

TODO add description!

pekko-extension-effect-persistence

TODO add description!

pekko-extension-effect-cluster

TODO add description!

pekko-extension-effect-cluster-sharding

TODO add description!

pekko-extension-effect-eventsourcing

Engine.scala

This is the main runtime/queue where all actions against your state are processed in a desired event-sourcing sequence:

  1. validate and finalize events
  2. append events to journal
  3. publish changed state
  4. execute side effects

It is optimized for maximum throughput, hence different steps of different actions might be executed in parallel as well as events might be stored in batches

trait Engine[F[_], S, E] {
  def state: F[State[S]]

  /**
    * @return Outer F[_] is about `load` being enqueued, this immediately provides order guarantees
    *         Inner F[_] is about a `load` being completed
    */
  def apply[A](load: F[Validate[F, S, E, A]]): F[F[A]]
}

Library mappings pekko to akka

pekko akka migrated from version
pekko-extension-serialization akka-serialization 1.1.0
pekko-extension-pubsub pubsub 10.0.0
pekko-extension-test-actor akka-test 0.3.0
pekko-extension-test-http akka-test 0.3.0
pekko-extension-distributed-data-tools ddata-tools 3.1.0
pekko-extension-sharding-strategy sharding-strategy 3.0.2
pekko-extension-tools-test akka-tools 3.3.13
pekko-extension-tools-util akka-tools 3.3.13
pekko-extension-tools-serialization akka-tools 3.3.13
pekko-extension-tools-persistence akka-tools 3.3.13
pekko-extension-tools-cluster akka-tools 3.3.13
pekko-extension-tools-instrumentation akka-tools 3.3.13
pekko-extension-conhub conhub 3.0.0
pekko-extension-effect-actor akka-effect 4.1.10
pekko-extension-effect-testkit akka-effect 4.1.10
pekko-extension-effect-actor-tests akka-effect 4.1.10
pekko-extension-effect-persistence-api akka-effect 4.1.10
pekko-extension-effect-persistence akka-effect 4.1.10
pekko-extension-effect-cluster akka-effect 4.1.10
pekko-extension-effect-cluster-sharding akka-effect 4.1.10
pekko-extension-effect-eventsourcing akka-effect 4.1.10

About

set of libraries with extensions for pekko

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 14

Languages