Documentation for version v1.4.0.

corva-sdk is a framework for building Corva DevCenter Python apps.

1. Install

corva-sdk requires Python 3.8.

Installation is as simple as:

pip install corva-sdk

2. App Types

There are three app types that you can build:

  1. stream - works with real-time data.

  2. scheduled - works with data at defined schedules/intervals (e.g., once a minute, once every 3 ft.).

  3. task - works with data on-demand.

Use type hints like in examples below for better support from editors and tools.

2.1. Stream

Stream apps can be time or depth based.

2.1.1. Time

from corva import Api, Cache, StreamTimeEvent, stream  (1)


@stream  (3)
def stream_time_app(event: StreamTimeEvent, api: Api, cache: Cache):  (2)
    return "Hello, World!"  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using stream.
4 Add app logic here.

2.1.2. Depth

from corva import Api, Cache, StreamDepthEvent, stream  (1)


@stream  (3)
def stream_depth_app(event: StreamDepthEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using stream.
4 Add app logic here.

2.2. Scheduled

Scheduled apps can be data time, depth or natural time based.

2.2.1. Data Time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled  (1)


@scheduled  (3)
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.2.2. Depth

from corva import Api, Cache, ScheduledDepthEvent, scheduled  (1)


@scheduled  (3)
def scheduled_app(event: ScheduledDepthEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.2.3. Natural Time

from corva import Api  (1)
from corva import Cache, ScheduledNaturalTimeEvent, scheduled


@scheduled  (3)
def scheduled_app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.3. Task

from corva import Api, TaskEvent, task  (1)


@task  (3)
def task_app(event: TaskEvent, api: Api):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive two argumets: event and api. The arguments serve as building blocks for your app.
3 Decorate your function using task.
4 Add app logic here.

3. Event

Event is an object that contains essential data for the app. Each app type has its corresponding event type, which only contains fields relevant to that app.

4. Api

Apps should be able to communicate with Corva Platform API and Corva Data API. For this corva-sdk provides an Api object, which wraps Python requests library and adds automatic authorization, convenient URL usage and reasonable timeouts to API requests. Api methods return requests.Response objects.

4.1. Url usage

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get('/v2/pads')  (1)
    api.get('/api/v1/data/provider/dataset/')  (2)
    api.get('https://api.corva.ai/v2/pads')  (3)
1 Use Corva Platform API URL suffix to make a Platform API call.
2 Use Corva Data API URL suffix to make a Data API call.
3 You can also provide full URL.

4.2. HTTP GET

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    response = api.get('/v2/pads')  (1)
    api.get('/v2/pads', params={'company': 1})  (2)

    response.json()  (3)
1 Simplest GET example.
2 Use optional params parameter to provide URL query string params.
3 You can unpack received data like this, as all Api methods return requests.Response objects.

4.3. HTTP POST, DELETE, PUT and PATCH

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.post('/v2/pads', data={'key': 'val'})   (1) (5)
    api.delete('/v2/pads/123')  (2)
    api.put('/api/v1/data/provider/dataset/', data={'key': 'val'})   (3) (5)
    api.patch('/v2/pads/123', data={'key': 'val'})   (4) (5)
1 Simplest POST example.
2 Simplest DELETE example.
3 Simplest PUT example.
4 Simplest PATCH example.
5 Use data parameter to provide a request body, that will be casted to json.

4.4. Custom headers and timeouts

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get('/v2/pads', headers={'header': 'header-value'})  (1)
    api.get('/v2/pads', timeout=5)  (2)
1 Use headers parameter to add custom headers to the request.
2 Use timeout parameter to override default timeout value.

4.5. Convenience methods

Api provides some convenience methods for frequently used endpoints.

4.5.1. Get dataset

Fetch the data from the /api/v1/data/provider/dataset/ endpoint using Api.get_dataset method.

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get_dataset(
        provider='corva',
        dataset='wits',
        query={
            'asset_id': event.asset_id,
        },
        sort={'timestamp': 1},
        limit=1,
        fields='data,metadata',
    )

5. Cache

Apps might need to share some data between invokes. For this corva-sdk provides a Cache object. Cache uses a dict-like database, so the data is stored as key:value pairs.

A typical example of Cache usage is:

  1. Store some data during app invoke 1.

  2. Retrieve and use the data during app invoke 2.

To get the most out of Cache:

  1. Store as small amounts of data as possible.

  2. Try to stay below 100kb.

Task apps don’t get a Cache parameter as they aren’t meant to share the data between invokes.

5.1. Get and set

Cache can store only string data.

Cast your data to str before saving.

5.1.1. Storing str

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='str', value='text')  (1)
    assert cache.get(key='str') == 'text'  (2)
1 Save str data to Cache.
2 Load the value using its key.

5.1.2. Storing int

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='int', value=str(0))  (1)
    assert cache.get('int') == '0'  (2)
    assert int(cache.get('int')) == 0  (3)
1 Cast int to str before saving data to Cache.
2 Load the value using its key. Notice that returned value has str type.
3 Cast the value back to int as needed.

5.1.3. Storing dict

import json

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='json', value=json.dumps({'int': 0, 'str': 'text'}))  (1)
    assert isinstance(cache.get('json'), str)  (2)
    assert json.loads(cache.get('json')) == {'int': 0, 'str': 'text'}  (3)
1 Cast dict to JSON str before saving data to Cache.
2 Load the value using its key. Notice that returned value has str type.
3 Parse JSON str and convert it back into a dict.

5.2. Delete

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='str', value='text')  (1)
    assert cache.get(key='str') == 'text'  (2)

    cache.delete(key='str')  (3)
    assert cache.get(key='str') is None  (4)
1 Save some data to Cache.
2 Load the value using its key.
3 Delete the data, when it is no longer needed.
4 The data is not present anymore.

5.3. Key expiry

By default keys expire in 60 days.
import time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.set(key='key', value='value', ttl=1)  (1)
    assert cache.get('key') == 'value'

    time.sleep(1)  (2)

    assert cache.get('key') is None  (3)
1 Specify how many seconds you want your key to live using ttl parameter.
2 Wait for key to expire.
3 The data is not present anymore.

5.4. Bulk methods

Cache provides some bulk methods which make it easy to work with multiple keys at once.

5.4.1. Get many, get all and set many

import time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    assert cache.get_all() == {}  (1)

    cache.set_many(  (2)
        data=[
            ('key', 'value_1'),
            ('key_with_custom_expiry', 'value_2', 1),  (3)
        ]
    )

    assert cache.get_many(  (4)
        keys=[
            'key',
            'non-existent-key',  (5)
        ]
    ) == {'key': 'value_1', 'non-existent-key': None}

    assert cache.get_all() == {  (6)
        'key': 'value_1',
        'key_with_custom_expiry': 'value_2',
    }

    time.sleep(1)  (7)

    assert cache.get_all() == {'key': 'value_1'}  (8)
1 Get all the data from the hash. It is empty as we have not stored anything yet.
2 Store multiple key-value pairs at once.
3 You can set custom key expiry in seconds by providing additional tuple element.
4 Get multiple keys at once.
5 If you request a non-existent key it will be assigned a value of None.
6 Get all the data from the hash.
7 Wait for key with custom expiry to expire.
8 The expired key is not present anymore.

5.4.2. Delete many and delete all

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    assert cache.get_all() == {}  (1)

    cache.set_many(data=[('k1', 'v1'), ('k2', 'v2'), ('k3', 'v3'), ('k4', 'v4')])  (2)

    cache.delete_many(keys=['k1', 'k2'])  (3)
    assert cache.get_all() == {'k3': 'v3', 'k4': 'v4'}  (4)

    cache.delete_all()  (5)
    assert cache.get_all() == {}  (6)
1 Cache is empty as we have not stored anything yet.
2 Store some data.
3 Delete multiple keys at once.
4 Deleted keys are non-existent anymore.
5 Delete all the data.
6 Cache is empty.

6. Logging

As apps are executed very frequently (once a second or so), unlimited logging can lead to huge amounts of data. corva-sdk provides a Logger object, which is a safe way for app logging.

The Logger is a logging.Logger instance and should be used like every other Python logger.

The Logger has following features:

  • Log messages are injected with contextual information, which makes it easy to filter through logs while debugging issues.

  • Log messages have limited length. Too long messages are truncated to not exceed the limit. Max message size can be controlled by LOG_THRESHOLD_MESSAGE_SIZE env variable. Default value is 1000 symbols or bytes.

  • Number of log messages is limited. After reaching the limit logging gets disabled. Number of log messages can be controlled by LOG_THRESHOLD_MESSAGE_COUNT env variable. Default value is 15 messages.

  • Logging level can be set using LOG_LEVEL env variable. Default value is INFO, see Python log levels for other available options.

from corva import Api, Logger, TaskEvent, task  (1)


@task
def task_app(event: TaskEvent, api: Api):
    (2)
    Logger.debug('Debug message!')
    Logger.info('Info message!')
    Logger.warning('Warning message!')
    Logger.error('Error message!')
    try:
        0 / 0
    except ZeroDivisionError:
        Logger.exception('Exception message!')
1 Import Logger object.
2 Use Logger as every other Python logger.

6.1. Customizations

You might want to send logs to other places (e.g., to error reporting systems like Sentry or Rollbar). This can be achieved by providing an instance of logging handler as an argument to app decorator. Custom handler will be used alongside corva-sdk's default one.

import logging  (1)

from corva import Api, Logger, TaskEvent, task

stream_handler = logging.StreamHandler()  (2)


@task(handler=stream_handler)  (3)
def task_app(event: TaskEvent, api: Api):
    Logger.info('Info message!')  (4)
1 Import the module which contains the handler that we want to use.
2 Initialize the handler.
3 Pass the handler as a keyword argument to the app decorator.
4 Logs will be sent to both stream_handler and corva-sdk's default one.

6.1.1. Sentry

pip install sentry-sdk (1)
1 Install the library.
import sentry_sdk  (1)

from corva import Api, TaskEvent, task

sentry_sdk.init("YOUR_SENTRY_DSN")  (2)


@task
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (3)
1 Import Sentry SDK.
2 Initialize the library.
3 All errors will be reported to Sentry now.

6.1.2. Rollbar

pip install rollbar (1)
1 Install the library.
import rollbar.logger  (1)

from corva import Api, TaskEvent, task

rollbar_handler = rollbar.logger.RollbarHandler('YOUR_ROLLBAR_ACCESS_TOKEN')  (2)


@task(handler=rollbar_handler)  (3)
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (4)
1 Import Rollbar SDK.
2 Initialize Rollbar handler.
3 Pass the handler as a keyword argument to the app decorator.
4 All errors will be reported to Rollbar now.

6.1.3. Raygun

pip install raygun4py (1)
1 Install the library.
import raygun4py.raygunprovider  (1)

from corva import Api, TaskEvent, task

raygun_handler = raygun4py.raygunprovider.RaygunHandler('YOUR_RAYGUN_API_KEY')  (2)


@task(handler=raygun_handler)  (3)
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (4)
1 Import Raygun SDK.
2 Initialize Raygun handler.
3 Pass the handler as a keyword argument to the app decorator.
4 All errors will be reported to Raygun now.

6.1.4. Other libraries

You can use any other error logging libraries. Just initialize and pass corresponding logging handler as a keyword argument to the app decorator. Use code samples above as the examples.

7. Secrets

You can store sensitive data such as passwords, tokens, confidential configuration data, or any other types of data that you want to protect as application secrets. To access such data inside the app corva-sdk provides secrets object.

See testing secrets section to see how to test an app that uses secrets.

from corva import Api, TaskEvent, secrets, task  (1)


@task
def task_app(event: TaskEvent, api: Api):
    secrets['api_token']  (2)
    int(secrets['integer'])  (3)
1 Import secrets object.
2 secrets is a dictionary so use key to retrieve the value.
3 Values are stored as strings, cast the value to required type as needed. Example shows how to get the integer.

8. Testing

Testing apps is easy and enjoyable.

corva-sdk provides convenient tools for testing through pytest-plugin.

Write your tests using pytest to get the access to the plugin.

To install the library run:

pip install pytest

8.1. Stream

8.1.1. Time

from corva import Api, Cache, StreamTimeEvent, StreamTimeRecord, stream


@stream
def stream_app(event: StreamTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_stream_time_app(app_runner):  (2)
    event = StreamTimeEvent(  (3)
        asset_id=0, company_id=0, records=[StreamTimeRecord(timestamp=0)]
    )

    result = app_runner(stream_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

8.1.2. Depth

from corva import Api, Cache, StreamDepthEvent, StreamDepthRecord, stream


@stream
def stream_app(event: StreamDepthEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_stream_depth_app(app_runner):  (2)
    event = StreamDepthEvent(  (3)
        asset_id=0, company_id=0, records=[StreamDepthRecord(measured_depth=0)]
    )

    result = app_runner(stream_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

8.2. Scheduled

8.2.1. Data Time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledDataTimeEvent(
        asset_id=0, start_time=0, end_time=0, company_id=0
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

8.2.2. Depth

from corva import Api, Cache, ScheduledDepthEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDepthEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledDepthEvent(
        asset_id=0,
        company_id=0,
        top_depth=0.0,
        bottom_depth=1.0,
        log_identifier='',
        interval=1.0,
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

8.2.3. Natural Time

from corva import Api, Cache, ScheduledNaturalTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledNaturalTimeEvent(
        asset_id=0, company_id=0, schedule_start=0, interval=1
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

8.3. Task

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):  (1)
    return 'Hello, World!'


def test_task_app(app_runner):  (2)
    event = TaskEvent(asset_id=0, company_id=0)  (3)

    result = app_runner(task_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

8.4. Secrets

This section shows how to test an app that uses secrets.

from corva import Api, TaskEvent, secrets, task


@task
def task_app(event: TaskEvent, api: Api):  (1)
    api_token = secrets['api_token']

    return api_token


def test_task_app(app_runner):  (2)
    event = TaskEvent(asset_id=0, company_id=0)  (3)

    api_token = app_runner(task_app, event, secrets={'api_token': '12345'})  (4)

    assert api_token == '12345'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app. Pass dictionary with required secrets as secrets parameter.
5 Verify the result.

9. Development - Contributing

Here are some guidelines to set up your environment.

9.1. Create and activate virtual environment

What’s needed:

  • Python 3.8.

  • Opened terminal inside cloned corva-sdk repository.

python -m venv env (1)
source ./env/bin/activate (2)
1 Create a directory ./env/ with isolated Python environment inside. You will be able to install needed packages there.
2 Activate the new environment.

From now on you must always have activated virtual environment when working with the project.

9.2. Install dependencies and run tests

What’s needed:

  • Installed make.

make install (1)
make all  (2)
1 Install all development requirements.
2 Run tests and linter to verify that the project was set up properly.

9.3. What’s next?

After completing steps above you can explore the project and make a contribution.

make help (1)
1 List available make targets - a good starting point for exploration.