corva-sdk

Documentation for version 1.11.4.

corva-sdk is a framework for building Corva DevCenter Python apps.

1. Install

corva-sdk requires Python 3.8.

Installation is as simple as:

pip install corva-sdk

2. App Types

There are three app types that you can build:

  1. stream - works with real-time data.

  2. scheduled - works with data at defined schedules/intervals (e.g., once a minute, once every 3 ft.).

  3. task - works with data on-demand.

Each app type can have optional handler to process partial rerun merge events. TIP: Use type hints like in examples below for better support from editors and tools.

2.1. Stream

Stream apps can be time or depth based.

2.1.1. Time

from corva import Api, Cache, StreamTimeEvent, stream  (1)


@stream  (3)
def stream_time_app(event: StreamTimeEvent, api: Api, cache: Cache):  (2)
    return "Hello, World!"  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using stream.
4 Add app logic here.

2.1.2. Depth

from corva import Api, Cache, StreamDepthEvent, stream  (1)


@stream  (3)
def stream_depth_app(event: StreamDepthEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using stream.
4 Add app logic here.

2.2. Scheduled

Scheduled apps can be data time, depth or natural time based.

2.2.1. Data Time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled  (1)


@scheduled  (3)
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.2.2. Depth

from corva import Api, Cache, ScheduledDepthEvent, scheduled  (1)


@scheduled  (3)
def scheduled_app(event: ScheduledDepthEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.2.3. Natural Time

from corva import Api, Cache, ScheduledNaturalTimeEvent, scheduled  (1)


@scheduled  (3)
def scheduled_app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.3. Task

from corva import Api, TaskEvent, task  (1)


@task  (3)
def task_app(event: TaskEvent, api: Api):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive two arguments: event and api. The arguments serve as building blocks for your app.
3 Decorate your function using task.
4 Add app logic here.

2.4. Partial Rerun Merge Handler (Optional)

from corva import (  (1)
    Api,
    Cache,
    PartialRerunMergeEvent,
    StreamTimeEvent,
    partial_rerun_merge,
    stream,
)


@stream
def stream_app(event: StreamTimeEvent, api: Api, cache: Cache):
    return "Handling stream event..."


@partial_rerun_merge  (3)
def partial_rerun_app(
    event: PartialRerunMergeEvent,
    api: Api,
    asset_cache: Cache,
    rerun_asset_cache: Cache,
):  (2)
    return "Hello, World!"  (4)
1 Import required functionality.
2 Define your function. It must receive 4 arguments: event, api, cache, cache. The arguments serve as building blocks for your app.
3 Decorate your function using partial_rerun_merge.
4 Add app logic here.

3. Event

Event is an object that contains essential data for the app. Each app type has its corresponding event type, which only contains fields relevant to that app.

4. Api

Apps should be able to communicate with Corva Platform API and Corva Data API. For this corva-sdk provides an Api object, which wraps Python requests library and adds automatic authorization, convenient URL usage and reasonable timeouts to API requests. Api methods return requests.Response objects.

4.1. Url usage

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get('/v2/pads')  (1)
    api.get('/api/v1/data/provider/dataset/')  (2)
    api.get('https://api.corva.ai/v2/pads')  (3)
1 Use Corva Platform API URL suffix to make a Platform API call.
2 Use Corva Data API URL suffix to make a Data API call.
3 You can also provide full URL.

4.2. HTTP GET

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    response = api.get('/v2/pads')  (1)
    api.get('/v2/pads', params={'company': 1})  (2)

    response.json()  (3)
1 Simplest GET example.
2 Use optional params parameter to provide URL query string params.
3 You can unpack received data like this, as all Api methods return requests.Response objects.

4.3. HTTP POST, DELETE, PUT and PATCH

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.post('/v2/pads', data={'key': 'val'})  (1) (5)
    api.delete('/v2/pads/123')  (2)
    api.put('/api/v1/data/provider/dataset/', data={'key': 'val'})  (3) (5)
    api.patch('/v2/pads/123', data={'key': 'val'})  (4) (5)
1 Simplest POST example.
2 Simplest DELETE example.
3 Simplest PUT example.
4 Simplest PATCH example.
5 Use data parameter to provide a request body, that will be cast to json.

4.4. Custom headers and timeouts

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get('/v2/pads', headers={'header': 'header-value'})  (1)
    api.get('/v2/pads', timeout=5)  (2)
1 Use headers parameter to add custom headers to the request.
2 Use timeout parameter to override default timeout value.

4.5. Convenience methods

Api provides some convenience methods for frequently used endpoints.

4.5.1. Get dataset

Fetch the data from the /api/v1/data/provider/dataset/ endpoint using Api.get_dataset method.

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get_dataset(
        provider='corva',
        dataset='wits',
        query={
            'asset_id': event.asset_id,
        },
        sort={'timestamp': 1},
        limit=1,
        fields='data,metadata',
    )

4.5.2. Produce messages

Post data to the /api/v1/message_producer/ endpoint using Api.produce_messages method. The method will work for both stream and scheduled types of apps.

from corva import Api, Cache, ScheduledDataTimeEvent, ScheduledDepthEvent, scheduled


@scheduled
def scheduled_time_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    api.produce_messages(data=[{'timestamp': 1}, {'timestamp': 2}])  (1)


@scheduled
def scheduled_depth_app(event: ScheduledDepthEvent, api: Api, cache: Cache):
    api.produce_messages(data=[{'measured_depth': 1.0}, {'measured_depth': 2.0}])  (2)
1 Example of producing time messages.
2 Example of producing depth messages.

4.5.3. Insert data

Post data to the /api/v1/data/provider/dataset/ endpoint.

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    api.insert_data(
        provider='my-company-name',
        dataset='my-datset-name',
        data=[
            {
                'asset_id': event.asset_id,
                'version': 1,
                'timestamp': 0,
                'data': {'result': 'very important result'},
            },
            {
                'asset_id': event.asset_id,
                'version': 1,
                'timestamp': 1,
                'data': {'result': 'very important result'},
            },
        ],  (1)
        produce=True,  (2)
    )
1 Save two documents to dataset.
2 You can enable this flag to save and produce the data at once.

4.6. Enabling re-tries

When request fails due to HTTP error, re-trying functionality can be used. It will try to re-send the same request again with exponential back-off for HTTP status codes below:

  • 428

  • 500

  • 502

  • 503

  • 504

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    api.max_retries = 5  # Enabling up to 5 retries when HTTP error happens.
    ...

5. Cache

Apps might need to share some data between invokes. For this corva-sdk provides a Cache object. Cache uses a dict-like database, so the data is stored as key:value pairs.

A typical example of Cache usage is:

  1. Store some data during app invoke 1.

  2. Retrieve and use the data during app invoke 2.

To get the most out of Cache:

  1. Store as small amounts of data as possible.

  2. Try to stay below 100kb.

Task apps don’t get a Cache parameter as they aren’t meant to share the data between invokes.

5.1. Get and set

Cache can store only string data.

Cast your data to str before saving.

5.1.1. Storing str

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='str', value='text')  (1)
    assert cache.get(key='str') == 'text'  (2)
1 Save str data to Cache.
2 Load the value using its key.

5.1.2. Storing int

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='int', value=str(0))  (1)
    assert cache.get('int') == '0'  (2)
    assert int(cache.get('int')) == 0  (3)
1 Cast int to str before saving data to Cache.
2 Load the value using its key. Notice that returned value has str type.
3 Cast the value back to int as needed.

5.1.3. Storing dict

import json

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='json', value=json.dumps({'int': 0, 'str': 'text'}))  (1)
    assert isinstance(cache.get('json'), str)  (2)
    assert json.loads(cache.get('json')) == {'int': 0, 'str': 'text'}  (3)
1 Cast dict to JSON str before saving data to Cache.
2 Load the value using its key. Notice that returned value has str type.
3 Parse JSON str and convert it back into a dict.

5.2. Delete

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='str', value='text')  (1)
    assert cache.get(key='str') == 'text'  (2)

    cache.delete(key='str')  (3)
    assert cache.get(key='str') is None  (4)
1 Save some data to Cache.
2 Load the value using its key.
3 Delete the data, when it is no longer needed.
4 The data is not present anymore.

5.3. Key expiry

By default, keys expire in 60 days.
import time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='key', value='value', ttl=1)  (1)
    assert cache.get('key') == 'value'

    time.sleep(1)  (2)

    assert cache.get('key') is None  (3)
1 Specify how many seconds you want your key to live using ttl parameter.
2 Wait for key to expire.
3 The data is not present anymore.

5.4. Bulk methods

Cache provides some bulk methods which make it easy to work with multiple keys at once.

5.4.1. Get many, get all and set many

import time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    assert cache.get_all() == {}  (1)

    cache.set_many(  (2)
        data=[
            ('key', 'value_1'),
            ('key_with_custom_expiry', 'value_2', 1),  (3)
        ]
    )

    assert cache.get_many(  (4)
        keys=[
            'key',
            'non-existent-key',  (5)
        ]
    ) == {'key': 'value_1', 'non-existent-key': None}

    assert cache.get_all() == {  (6)
        'key': 'value_1',
        'key_with_custom_expiry': 'value_2',
    }

    time.sleep(1)  (7)

    assert cache.get_all() == {'key': 'value_1'}  (8)
1 Get all the data from the hash. It is empty as we have not stored anything yet.
2 Store multiple key-value pairs at once.
3 You can set custom key expiry in seconds by providing additional tuple element.
4 Get multiple keys at once.
5 If you request a non-existent key it will be assigned a value of None.
6 Get all the data from the hash.
7 Wait for key with custom expiry to expire.
8 The expired key is not present anymore.

5.4.2. Delete many and delete all

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    assert cache.get_all() == {}  (1)

    cache.set_many(data=[('k1', 'v1'), ('k2', 'v2'), ('k3', 'v3'), ('k4', 'v4')])  (2)

    cache.delete_many(keys=['k1', 'k2'])  (3)
    assert cache.get_all() == {'k3': 'v3', 'k4': 'v4'}  (4)

    cache.delete_all()  (5)
    assert cache.get_all() == {}  (6)
1 Cache is empty as we have not stored anything yet.
2 Store some data.
3 Delete multiple keys at once.
4 Deleted keys are non-existent anymore.
5 Delete all the data.
6 Cache is empty.

6. Logging

As apps are executed very frequently (once a second or so), unlimited logging can lead to huge amounts of data. corva-sdk provides a Logger object, which is a safe way for app logging.

The Logger is a logging.Logger instance and should be used like every other Python logger.

The Logger has following features:

  • Log messages are injected with contextual information, which makes it easy to filter through logs while debugging issues.

  • Log messages have limited length. Too long messages are truncated to not exceed the limit. Max message size can be controlled by LOG_THRESHOLD_MESSAGE_SIZE env variable. Default value is 1000 symbols or bytes.

  • Number of log messages is limited. After reaching the limit logging gets disabled. Number of log messages can be controlled by LOG_THRESHOLD_MESSAGE_COUNT env variable. Default value is 15 messages.

  • Logging level can be set using LOG_LEVEL env variable. Default value is INFO, see Python log levels for other available options.

from corva import Api, Logger, TaskEvent, task  (1)


@task
def task_app(event: TaskEvent, api: Api):
    (2)
    Logger.debug('Debug message!')
    Logger.info('Info message!')
    Logger.warning('Warning message!')
    Logger.error('Error message!')
    try:
        0 / 0
    except ZeroDivisionError:
        Logger.exception('Exception message!')
1 Import Logger object.
2 Use Logger as every other Python logger.

6.1. Customizations

You might want to send logs to other places (e.g., to error reporting systems like Sentry or Rollbar). This can be achieved by providing an instance of logging handler as an argument to app decorator. Custom handler will be used alongside corva-sdk's default one.

import logging  (1)

from corva import Api, Logger, TaskEvent, task

stream_handler = logging.StreamHandler()  (2)


@task(handler=stream_handler)  (3)
def task_app(event: TaskEvent, api: Api):
    Logger.info('Info message!')  (4)
1 Import the module which contains the handler that we want to use.
2 Initialize the handler.
3 Pass the handler as a keyword argument to the app decorator.
4 Logs will be sent to both stream_handler and corva-sdk's default one.

6.1.1. Sentry

pip install sentry-sdk (1)
1 Install the library.
import sentry_sdk  (1)

from corva import Api, TaskEvent, task

sentry_sdk.init("YOUR_SENTRY_DSN")  (2)


@task
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (3)
1 Import Sentry SDK.
2 Initialize the library.
3 All errors will be reported to Sentry now.

6.1.2. Rollbar

pip install rollbar (1)
1 Install the library.
import rollbar.logger  (1)

from corva import Api, TaskEvent, task

rollbar_handler = rollbar.logger.RollbarHandler('YOUR_ROLLBAR_ACCESS_TOKEN')  (2)


@task(handler=rollbar_handler)  (3)
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (4)
1 Import Rollbar SDK.
2 Initialize Rollbar handler.
3 Pass the handler as a keyword argument to the app decorator.
4 All errors will be reported to Rollbar now.

6.1.3. Raygun

pip install raygun4py (1)
1 Install the library.
import raygun4py.raygunprovider  (1)

from corva import Api, TaskEvent, task

raygun_handler = raygun4py.raygunprovider.RaygunHandler('YOUR_RAYGUN_API_KEY')  (2)


@task(handler=raygun_handler)  (3)
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (4)
1 Import Raygun SDK.
2 Initialize Raygun handler.
3 Pass the handler as a keyword argument to the app decorator.
4 All errors will be reported to Raygun now.

6.1.4. Other libraries

You can use any other error logging libraries. Just initialize and pass corresponding logging handler as a keyword argument to the app decorator. Use code samples above as the examples.

7. Merging incoming events

Only stream and scheduled apps can use this feature.

Sometimes Corva can send more than one event to scheduled and stream apps. Optionally we can ask to merge them into one event by providing merge_events=True parameter.

from corva import Api, Cache, StreamTimeEvent, stream


# imagine we actually have 3 incoming events with 3 records each
@stream(merge_events=True)
def app(event: StreamTimeEvent, api: Api, cache: Cache):
    # since we passed merge_events=True all 3 incoming events
    # and their records will be merged into a single event with 9 records
    assert len(event.records) == 9  # this will not fail
    return event
from corva import Api, Cache, ScheduledNaturalTimeEvent, scheduled


@scheduled(merge_events=True)
def app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):
    return event

Usually this is needed to save some IO operations by processing data in bigger batches. Use this parameter with care, in pessimistic scenario you can receive too much data, try to process it in "one go" and fail with timeout. In that case your app will be automatically restarted and you will start from the beginning and fail again. Without this parameter after each processed event corva-sdk will "remember" that event was processed. So, for example, if you will fail at event #5 and your app will be restarted - app will start processing from event #5(and not #1 like in case of merge_events=True)

8. Followable apps

Only stream and scheduled apps can be followed.

An app is called followable once it can be followed by other ones. Followable apps must produce data to trigger chain reaction of following app runs. See Produce messages and Insert data sections for instructions on how to produce messages.

from corva import Api, Cache, ScheduledDataTimeEvent, StreamTimeEvent, scheduled, stream


@scheduled  (1)
def followable_scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    data = [
        {
            'asset_id': event.asset_id,
            'version': 1,
            'timestamp': 0,
            'data': {'answer': 10},
        },
        {
            'asset_id': event.asset_id,
            'version': 1,
            'timestamp': 60,
            'data': {'answer': 11},
        },
    ]  (2)

    (3)
    api.insert_data(
        provider='my-provider',
        dataset='quiz-answers',
        data=data,
    )  (4)
    api.produce_messages(data=data)  (5)

    (6)
    api.insert_data(
        provider='my-provider',
        dataset='quiz-answers',
        data=data,
        produce=True,  (7)
    )


@scheduled  (8)
def following_scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    data = api.get_dataset(
        provider='my-provider',
        dataset='quiz-answers',
        query={'asset_id': event.asset_id, 'company_id': event.company_id},
        sort={'timestamp': 1},
        limit=2,
    )  (9)

    assert [datum['data'] for datum in data] == [
        {'answer': 10},
        {'answer': 11},
    ]


@stream  (10)
def following_stream_app(event: StreamTimeEvent, api: Api, cache: Cache):
    assert [record.data for record in event.records] == [
        {'answer': 10},
        {'answer': 11},
    ]  (11)
1 The followable app.
2 Data documents that should be saved and produced.
3 Slow method which uses two api calls.
4 First save the data to the dataset. See Insert data for details.
5 Then produce the messages. See Produce messages for details.
6 Recommended method which uses single api call.
7 Enable this flag to save and produce messages at once. See Insert data for details.
8 Example scheduled following app. It will be invoked as soon as there is some time interval worth of data.
9 Scheduled apps should query the data as unlike stream ones they don’t receive it in the event.
10 Example stream following app. It will be invoked as soon as new messages are produced.
11 Stream apps receive data straight from the event. No api calls needed.

9. Secrets

You can store sensitive data such as passwords, tokens, confidential configuration data, or any other types of data that you want to protect as application secrets. To access such data inside the app corva-sdk provides secrets object.

See testing secrets section to see how to test an app that uses secrets.

from corva import Api, TaskEvent, secrets, task  (1)


@task
def task_app(event: TaskEvent, api: Api):
    secrets['api_token']  (2)
    int(secrets['integer'])  (3)
1 Import secrets object.
2 secrets is a dictionary so use key to retrieve the value.
3 Values are stored as strings, cast the value to required type as needed. Example shows how to get the integer.

10. Testing

Testing apps is easy and enjoyable.

corva-sdk provides convenient tools for testing through pytest-plugin.

Write your tests using pytest to get the access to the plugin.

To install the library run:

pip install pytest

10.1. Stream

10.1.1. Time

from corva import Api, Cache, StreamTimeEvent, StreamTimeRecord, stream


@stream
def stream_app(event: StreamTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_stream_time_app(app_runner):  (2)
    event = StreamTimeEvent(  (3)
        asset_id=0, company_id=0, records=[StreamTimeRecord(timestamp=0)]
    )

    result = app_runner(stream_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

10.1.2. Depth

from corva import Api, Cache, StreamDepthEvent, StreamDepthRecord, stream


@stream
def stream_app(event: StreamDepthEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_stream_depth_app(app_runner):  (2)
    event = StreamDepthEvent(  (3)
        asset_id=0, company_id=0, records=[StreamDepthRecord(measured_depth=0)]
    )

    result = app_runner(stream_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

10.2. Scheduled

10.2.1. Data Time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledDataTimeEvent(
        asset_id=0, start_time=0, end_time=0, company_id=0
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

10.2.2. Depth

from corva import Api, Cache, ScheduledDepthEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDepthEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledDepthEvent(
        asset_id=0,
        company_id=0,
        top_depth=0.0,
        bottom_depth=1.0,
        log_identifier='',
        interval=1.0,
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

10.2.3. Natural Time

from corva import Api, Cache, ScheduledNaturalTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledNaturalTimeEvent(
        asset_id=0, company_id=0, schedule_start=0, interval=1
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

10.3. Task

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):  (1)
    return 'Hello, World!'


def test_task_app(app_runner):  (2)
    event = TaskEvent(asset_id=0, company_id=0)  (3)

    result = app_runner(task_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

10.4. Secrets

This section shows how to test an app that uses secrets.

from corva import Api, TaskEvent, secrets, task


@task
def task_app(event: TaskEvent, api: Api):  (1)
    api_token = secrets['api_token']

    return api_token


def test_task_app(app_runner):  (2)
    event = TaskEvent(asset_id=0, company_id=0)  (3)

    api_token = app_runner(task_app, event, secrets={'api_token': '12345'})  (4)

    assert api_token == '12345'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app. Pass dictionary with required secrets as secrets parameter.
5 Verify the result.

10.5. Cache

This section shows how to test an app that uses cache; reuse the same cache or reset it.

from corva import ScheduledDataTimeEvent, scheduled
from corva.service.cache_sdk import UserRedisSdk


@scheduled
def scheduled_fibonacci(event, api, cache):  (1)
    number1 = int(cache.get('number1') or 1)
    number2 = int(cache.get('number2') or 1)
    number3 = number1 + number2
    cache.set('number1', number2)
    cache.set('number2', number3)

    return number3


def test_reset_cache(app_runner):  (2)
    event = ScheduledDataTimeEvent(
        asset_id=0, company_id=0, start_time=0, end_time=0
    )  (3)

    for _ in range(5):
        result = app_runner(scheduled_fibonacci, event)  (4)
        assert result == 2  (5)


def test_reuse_cache(app_runner):  (6)
    event = ScheduledDataTimeEvent(
        asset_id=0, company_id=0, start_time=0, end_time=0
    )  (7)

    cache = UserRedisSdk(
        hash_name='hash_name', redis_dsn='redis://localhost', use_fakes=True
    )  (8)

    expected_results = [2, 3, 5, 8, 13, 21, 34, 55]
    for expected_result in expected_results:
        result = app_runner(scheduled_fibonacci, event, cache=cache)  (9)
        assert result == expected_result  (10)
1 Sample app that we want to test. It is a sample Fibonacci function that is relying on cache data; it receives two numbers from the cache, compute the sum and store the last two numbers in the cache.
2 Add app_runner argument to your test function. In this test case, the cache is reset for each app_runner call.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app. When the cache kwargs is not passed to the app_runner, it will reset everytime it is called.
5 Verify the result. Since the cache is reset for each app_runner call, the results are the same.
6 Add app_runner argument to your test function. In this test case, the cache is reused for each app_runner call.
7 Define the event that will be passed to the app.
8 Defining a cache object.
9 Use app_runner fixture to run the app. When the cache is passed in the app_runner, it will be used to run the app.
10 Verify the result. Since the same cache object is used, the results are changing.

11. App rerun

You might want to implement custom logic for app rerun, for this events include rerun field that stores rerun metadata.

Example usage scenario of rerun field:

  1. Well start 22/01/31 - end 22/03/31.

  2. Rerun start 22/02/02 - end 22/02/04.

  3. By default app uses the latest calculated record as the base to catch up to real-time.

  4. During rerun app must use the latest record before 22/02/02 (using rerun.start) as the base for the initial calculation.

12. Development - Contributing

Here are some guidelines to set up your environment.

12.1. Create and activate virtual environment

What’s needed:

  • Python 3.8.

  • Opened terminal inside cloned corva-sdk repository.

python -m venv env (1)
source ./env/bin/activate (2)
1 Create a directory ./env/ with isolated Python environment inside. You will be able to install needed packages there.
2 Activate the new environment.

From now on you must always have activated virtual environment when working with the project.

12.2. Install dependencies and run tests

What’s needed:

  • Installed make.

make install (1)
make all  (2)
1 Install all development requirements.
2 Run tests and linter to verify that the project was set up properly.

12.3. What’s next?

After completing steps above you can explore the project and make a contribution.

make help (1)
1 List available make targets - a good starting point for exploration.