App

Overview

The Slate Jobs component is a background job/persistant task queue system for Kotlin and is similar to Ruby SideKiq, NodeJS Bull. It leverages Kotlin Coroutines and Channels for concurrency based operations to gracefully start, stop, pause, resume jobs.

    
    slatekit new job -name="SampleJob" -package="mycompany.apps"
    


Diagram

A high-level diagram of the concepts in this component



Goals

Goal Description
1. Job types Different types of jobs OneTime, Paged, Queued job types.
2. Diagnostics Multiple types of diagnostics and tracking of jobs statistics available
3. Policies Policies as middleware and hooks to restrict, customize, or enhance jobs


Status

This component is currently stable and there is a project generator for it

Feature Status Description
Periodic Upcoming Support for periodic and scheduled jobs
Flow Upcoming Integration with Kotlin Flows

Back to top



Install

    repositories {
        // other repositories
        maven { url  "http://dl.bintray.com/codehelixinc/slatekit" }
    }

    dependencies {
        // other dependencies ...

        compile 'com.slatekit:slatekit-jobs:1.0.0'
    }

Back to top



Sources

Jar slatekit.jobs.jar
Package slatekit.jobs
Sources slatekit-jobs
Example Example_Jobs.kt
Version
License
Requires See build.gradle for more info.

Back to top



Example

This is a simple example of a Queued worker ( which takes it work from a Task which a unit-of-work stored in-memory or in persistent queue ).

     
    suspend fun sendNewsLetterFromQueue(task: Task):WorkResult {

        // Get data out of the task
        val userId = task.data.toInt()

        // Simulate getting the user and sending the newsletter
        val user = getUser(userId)
        send(task.job, "Latest News Letter!", user)

        // Acknowledge the task or abandon via task.fail()
        task.done()

        // Indicate that this can now handle more
        return WorkResult(WorkState.More)
    }


Back to top



Concepts

Component Description
Jobs A registry of all the jobs being managed.
Job The main component managing work between workers, queues, tasks, and policies.
Coordinator Responsible for coordinating requests on Workers ( by default using Channels )
Workers A collection of 1 or more workers ( linked to job )
Worker Performs the actual work (on a possible Task)
WorkState Represents the state of a single iteration of work done by a worker
Task A single work item that can be performed by a worker
Queue Represents a queue of Tasks either in-memory or persisted ( AWS SQS )
Action The Action(s) that can be performed on a Job/Worker: Start, Stop, Pause, Resume, Control, Process, Delay
Priority The priority level of a Queue
Policy A form of middleware to inject into a Job to adjust behaviour
Status The status ( idle, running, paused, stopped, completed ) of a worker or job.

Back to top



Guide

Name Description More
1. Setup How to setup a Worker more
2. Register How to set up a job and register it with Jobs system more
3. Usage How to start, stop, pause resume jobs and check status. more
4. Cycle The life-cycle of a worker more
5. Types OneTime, Paged, Queued Job types more
6. Events Subscribe to job events more
7. Workers Access to the jobs workers more
8. Stats Capturing job and worker statistics more
9. Load Performance and load more

Back to top



Identity

Every job must be set up with an Identity. This identity is transfered down to all workers ( given a unique uuid per worker ) and is used to uniquely identity jobs and workers.

      
    // slatekit.common.Identity is used to identify the job.  
    val id = SimpleIdentity("demo", "newsletter", Agent.Job, "dev")

    // id.id   : {AREA}.{SERVICE}.{AGENT}.{ENV}.{INSTANCE}
    // id.id   : demo.newsletter.job.dev.09cd7588-24cb-425b-8cec-2494c4460387
    // id.name : demo.newsletter
    // id.full : demo.newsletter.job.dev
    // id.area : demo
    // id.svc  : newsletter
    // id.agent: Job
    // id.env  : dev
    // id.inst : 09cd7588-24cb-425b-8cec-2494c4460387
    // id.tags : []


Setup

You can set up workers in various ways using either simple functions ( which get wrapped inside a Worker) or extending from the Worker class itself and implementing the work method. For full example, refer to Example_Jobs.kt

1. Function

You can declare a simple 0 parameter function to perform some work

     
    suspend fun sendNewsLetter():WorkResult {
        // Perform the work ...
        return WorkResult(WorkState.Done)
    }


2. Function with Task

You can declare a simple 1 parameter that takes in a Task ( representing a unit of work ). This assumes you are sourcing the work from a Queue.

     
    suspend fun sendNewsLetter(task: Task):WorkResult {
        println(task.id)    // abc123 
        println(task.from)  // queue://notification
        println(task.job)   // job1
        println(task.name)  // users.sendNewsletter
        println(task.data)  // { ... } json payload
        println(task.xid)   // 12345   correlation id
        // Perform the work ...
        return WorkResult(WorkState.Done)
    }


3. Worker class

Finally, you can extend from the Worker class. This is the most comprehensive option as it gives you the ability to customize various life-cycel events ( see later section ). The life-cycle events go from init -> work -> done.

      
    // slatekit.common.Identity is used to identify the job.  
    val id = SimpleIdentity("samples", "newsletter", Agent.Job, "dev")

    // Worker
    class NewsLetterWorker : Worker<String>(id) {

        // Initialization hook ( for setup / logs / alerts )
        override suspend fun init() {
            notify("initializing", listOf(("id" to this.id.name)))
        }

        // Implement your work here.
        // NOTE: If you are not using a queue, this task will be empty e.g. Task.empty
        override suspend fun work(task:Task): WorkResult {
            // Perform the work ...
            return WorkResult(WorkState.Done)
        }

        // Transition hook for when the status is changed ( e.g. from Status.Running -> Status.Paused )
        override suspend fun move(state: Status) {
            notify("move", listOf("status" to state.name))
        }

        // Completion hook ( for logic / logs / alerts )
        override suspend fun done() {
            notify("done", listOf(("id" to this.id.name)))
        }

        // Failure hook ( for logic / logs / alerts )
        override suspend fun fail(err:Throwable?) {
            notify("failure", listOf(("id" to this.id.name), ("err" to (err?.message ?: ""))))
        }

        // Initialization hook ( for setup / logs / alerts )
        override fun notify(desc: String?, extra: List<Pair<String, String>>?) {
            val detail = extra?.joinToString(",") { it.first + "=" + it.second }
            // Simulate notification to email/alerts/etc
            println(desc + detail)
        }
    }
Back to features Back to top


Register

Once you have a work function, you can register it as a new Job.

     

    // Identity
    val id = SimpleIdentity("samples", "newsletter", Agent.Job, "dev")
        
    // Queues
    val queue1 = Queue("queue1", Priority.Mid, QueueSourceInMemory.stringQueue(5))
    val queue2 = Queue("queue2", Priority.Mid, QueueSourceInMemory.stringQueue(5))

    // Registry
    // NOTE: Create an identity for each job using the id template above.
    val jobs = Jobs(
            listOf(queue1, queue2),
            listOf(
                    slatekit.jobs.Job(id.copy(service = "job1"), ::sendNewsLetter),
                    slatekit.jobs.Job(id.copy(service = "job2"), listOf(::sendNewsLetter, ::sendNewsLetterWithPaging)),
                    slatekit.jobs.Job(id.copy(service = "job3"), listOf(::sendNewsLetterWithPaging)),
                    slatekit.jobs.Job(id.copy(service = "job4"), listOf(::sendNewsLetterWithPaging)),
                    slatekit.jobs.Job(id.copy(service = "job5"), listOf(::sendNewsLetterFromQueue), queue1),
                    slatekit.jobs.Job(id.copy(service = "job6"), listOf(NewsLetterWorker()), queue2)
            )
    )

    // Run job named "samples.job1"
    jobs.run("samples.job1")
Back to features Back to top


Usage

There are various operations on the job from checking the status, to management actions such as starting, stopping, etc.

1. Actions

Jobs that are paged or queued can be gracefully started, stopped, paused, resumed. This is accomplished by sending Job Requests to the Job. These requests impact the status of the job and all its workers. These are methods available on the Job component itself.

Name Status Purpose
start Starting Starts the job and its associated workers
stop Stopped Stops the job and its associated workers
pause Paused Pauses the job and its associated workers
resume Running Resumes the jobs and its associated workers
process Running Issues a single process / work request

        
    // Assume jobs are registered in the Jobs component.
    // ....
    // See registration section

    // Get a job by its name
    val job = jobs.get("samples.job1")

    // Perform operations
    job?.let { job ->

        // NOTES: 
        // 1. This does not immediately perform the action.
        // 2. A Job request is sent to the Job Channel 
        // 3. A Job then delegates the respective action to its workers
        job.start()
        job.stop()
        job.pause()
        job.resume()
        job.process()
    }
     


2. Status

You can check the status of the job. The Status range are available in Status

        
    // Get a job by its name
    val job = jobs.get("samples.job1")

    // Check status
    job?.let { job ->
        // Get current status
        val status = job.status()

        // Check status
        job.isIdle()
        job.isRunning()
        job.isPaused()
        job.isStopped()
        job.isComplete()
        job.isFailed()
        job.isStoppedOrPaused()

        // Check for specific status
        job.isState(Status.Running)
        ""
    }
     
Back to features Back to top


Cycle

Workers have several life-cycle event methods. They are called in order and the status of the worker changes after the event.

Name Status Purpose
init Starting initialization hook for the worker
work Running initialization hook for the worker
fail Failed Called in the event of an error/exception/failure
done Complete Called when the work method has issues Work.Done
Back to features Back to top


Types

There are different types of jobs based on whether they run all their work in 1 go or perform work in batches/pages. This is (currently) not indicated explicitly at the function/worker level, but is based on what the function/work methods returns for the WorkState.

1. OneTime

A one time worker simply performs the long-running operation and returns a WorkResult indicating it’s done.

     
    suspend fun sendNewsLetter():WorkResult {
        // Process the work
        // ...
        return WorkResult(WorkState.Done)
    }


2. Paged

A paged worker runs performs work in batches or pages. After performing a batch of work, this paged worker must return a WorkState.Next.

        
    // This keeps track of the page/offset
    val offset = AtomicInteger(0)
    val batchSize = 4
    suspend fun sendNewsLetterWithPaging():WorkResult {
        // Process the work
        // ...

        // Increment the batch/page offset for next time
        offset.addAndGet(batchSize)

        // Indicate to the system this is paged and there is more to do.
        return WorkResult.next(
            offset = offset.get() + batchSize.toLong(), 
            processed = batchSize
            reference = "newsletters"
        )
    }
     


3. Queued

A queued worker runs work via Tasks from a Queue.

        
    suspend fun sendNewsLetterFromQueue(task: Task):WorkResult {

        // Get data out of the task
        val userId = task.data.toInt()

        // Simulate getting the user and sending the newsletter
        val user = getUser(userId)
        send(task.job, "Latest News Letter!", user)

        // Acknowledge the task or abandon via task.fail()
        task.done()

        // Indicate that this can now handle more
        return WorkResult(WorkState.More)
    }
     

Back to features Back to top


Events

You can subscribe to various job events such as any Status changes or for a specific Status change.

    
    // Subscribe to changes in status
    job.subscribe { println("Job: ${it.id} status changed to ${it.status().name}") }

    // Subscribe to change to Stopped status
    job.subscribe(Status.Stopped) { println("Job: ${it.id} stopped!")  }
     
Back to features Back to top


Policies

You can set up custom policies which essentially represent middleware functions to execute at various times and/or conditions of workers and jobs. Policies are available in the slatekit-functions project ( not yet documented).

    
    // There are 3 policies setup for this job:
    // 1. Every: for every 10 items processed, call the function supplied
    // 2. Limit: limit number of items to process to 12. ( useful for "waves")
    // 3. Ratio: Ensure the job stops if the error (failed) job ratio hits 10%
    val job = slatekit.jobs.Job(id.copy(service = "job7"), listOf(::sendNewsLetterWithPaging), 
        policies = listOf(
            Every(10, { req, res -> println("Paged : " + req.task.id + ":" + res.msg) }),
            Limit(12, true, { req -> req.context.stats.counts }),
            Ratio(.1, slatekit.results.Status.Errored(0, ""), { req -> req.context.stats.counts })
        )
    )
     

Back to features Back to top


Workers

A job internally holds a collection of workers via Workers. Each worker has an identity (based off the job’s identity ) in order to uniquely identify it. You can get the ids of all the workers and get the worker context which holds various components like polices ( middleware ) and diagnostics/stats.

    
    // Get the workers collection ( Workers )
    val workers = job.workers

    // Get all worker ids ( List<Identity> )
    val workerIds = workers.getIds()

    // Get the context for a specific worker ( WorkContext )
    val workerContext = job.workers.get(workerIds.first())

    // The worker performing work on tasks
    workerContext?.let { ctx ->

        // Identity of the worker parent ( Job.id )
        println(ctx.id)

        // Worker itself
        ctx.worker

        // Worker middleware applied
        ctx.policies

        // Worker statistics
        ctx.stats
    }
     

Back to features Back to top


Stats

Statistics are recorded at the worker level. There is ( currently ) no way aggregate statistics across all workers. But this can be done and/or integrated with a 3rd-party metrics library like Micrometer.

    
    // Get the workers collection ( Workers )
    val workers = job.workers

    // Get all worker ids ( List<Identity> )
    val workerIds = workers.getIds()

    // Get the context for a specific worker ( WorkContext )
    val workerContext = job.workers.get(workerIds.first())

    // The worker performing work on tasks
    workerContext?.let { ctx ->

        // Worker statistics
        val stats = ctx.stats

        // Worker statistics
        // Calls: Simple counters to count calls to a worker
        println("calls.totalRuns: " + ctx.stats.calls.totalRuns())
        println("calls.totalPassed: " + ctx.stats.calls.totalPassed())
        println("calls.totalFailed: " + ctx.stats.calls.totalFailed())
        
        // Counts: Counts the result success/failure categories
        // See slatekit.results.Status.kt for more info
        println("counts.totalProcessed : " + ctx.stats.counts.totalProcessed())
        println("counts.totalSucceeded : " + ctx.stats.counts.totalSucceeded())
        println("counts.totalDenied    : " + ctx.stats.counts.totalDenied())
        println("counts.totalInvalid   : " + ctx.stats.counts.totalInvalid())
        println("counts.totalIgnored   : " + ctx.stats.counts.totalIgnored())
        println("counts.totalErrored   : " + ctx.stats.counts.totalErrored())
        println("counts.totalUnexpected: " + ctx.stats.counts.totalUnexpected())

        // Lasts: Stores the last request/result of an call to work
        println("lasts.totalProcessed : " + ctx.stats.lasts?.lastProcessed())
        println("lasts.totalSucceeded : " + ctx.stats.lasts?.lastSuccess())
        println("lasts.totalDenied    : " + ctx.stats.lasts?.lastDenied())
        println("lasts.totalInvalid   : " + ctx.stats.lasts?.lastInvalid())
        println("lasts.totalIgnored   : " + ctx.stats.lasts?.lastIgnored())
        println("lasts.totalErrored   : " + ctx.stats.lasts?.lastErrored())
        println("lasts.totalUnexpected: " + ctx.stats.lasts?.lastUnexpected())
        
        // You can also hook up loggers and events
        ctx.stats.logger
        ctx.stats.events
    }
     

Back to top



Load

Performance and load docs coming soon


Back to top