Documentation for version v1.2.0.

corva-sdk is a framework for building Corva DevCenter Python apps.

1. Install

corva-sdk requires Python 3.8.

Installation is as simple as:

pip install corva-sdk

2. App Types

There are three app types that you can build:

  1. stream - works with real-time data.

  2. scheduled - works with data at defined schedules/intervals (e.g., once a minute, once every 3 ft.).

  3. task - works with data on-demand.

Use type hints like in examples below for better support from editors and tools.

2.1. Stream

Stream apps can be time or depth based.

2.1.1. Time

from corva import Api, Cache, StreamTimeEvent, stream  (1)


@stream  (3)
def stream_time_app(event: StreamTimeEvent, api: Api, cache: Cache):  (2)
    return "Hello, World!"  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using stream.
4 Add app logic here.

2.1.2. Depth

from corva import Api, Cache, StreamDepthEvent, stream  (1)


@stream  (3)
def stream_depth_app(event: StreamDepthEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using stream.
4 Add app logic here.

2.2. Scheduled

Scheduled apps can be data time, depth or natural time based.

2.2.1. Data Time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled  (1)


@scheduled  (3)
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.2.2. Depth

from corva import Api, Cache, ScheduledDepthEvent, scheduled  (1)


@scheduled  (3)
def scheduled_app(event: ScheduledDepthEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.2.3. Natural Time

from corva import Api  (1)
from corva import Cache, ScheduledNaturalTimeEvent, scheduled


@scheduled  (3)
def scheduled_app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
3 Decorate your function using scheduled.
4 Add app logic here.

2.3. Task

from corva import Api, TaskEvent, task  (1)


@task  (3)
def task_app(event: TaskEvent, api: Api):  (2)
    return 'Hello, World!'  (4)
1 Import required functionality.
2 Define your function. It must receive two argumets: event and api. The arguments serve as building blocks for your app.
3 Decorate your function using task.
4 Add app logic here.

3. Event

Event is an object that contains essential data for the app. Each app type has its corresponding event type, which only contains fields relevant to that app.

4. Api

Apps should be able to communicate with Corva Platform API and Corva Data API. For this corva-sdk provides an Api object, which wraps Python requests library and adds automatic authorization, convenient URL usage and reasonable timeouts to API requests. Api methods return requests.Response objects.

4.1. Url usage

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get('/v2/pads')  (1)
    api.get('/api/v1/data/provider/dataset/')  (2)
    api.get('https://api.corva.ai/v2/pads')  (3)
1 Use Corva Platform API URL suffix to make a Platform API call.
2 Use Corva Data API URL suffix to make a Data API call.
3 You can also provide full URL.

4.2. HTTP GET

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    response = api.get('/v2/pads')  (1)
    api.get('/v2/pads', params={'company': 1})  (2)

    response.json()  (3)
1 Simplest GET example.
2 Use optional params parameter to provide URL query string params.
3 You can unpack received data like this, as all Api methods return requests.Response objects.

4.3. HTTP POST, DELETE, PUT and PATCH

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.post('/v2/pads', data={'key': 'val'})   (1) (5)
    api.delete('/v2/pads/123')  (2)
    api.put('/api/v1/data/provider/dataset/', data={'key': 'val'})   (3) (5)
    api.patch('/v2/pads/123', data={'key': 'val'})   (4) (5)
1 Simplest POST example.
2 Simplest DELETE example.
3 Simplest PUT example.
4 Simplest PATCH example.
5 Use data parameter to provide a request body, that will be casted to json.

4.4. Custom headers and timeouts

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get('/v2/pads', headers={'header': 'header-value'})  (1)
    api.get('/v2/pads', timeout=5)  (2)
1 Use headers parameter to add custom headers to the request.
2 Use timeout parameter to override default timeout value.

4.5. Convenience methods

Api provides some convenience methods for frequently used endpoints.

4.5.1. Get dataset

Fetch the data from the /api/v1/data/provider/dataset/ endpoint using Api.get_dataset method.

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):
    api.get_dataset(
        provider='corva',
        dataset='wits',
        query={
            'asset_id': event.asset_id,
        },
        sort={'timestamp': 1},
        limit=1,
        fields='data,metadata',
    )

5. Cache

Apps might need to share some data between invokes. For this corva-sdk provides a Cache object. Cache uses a dict-like database, so the data is stored as key:value pairs.

A typical example of Cache usage is:

  1. Store some data during app invoke 1.

  2. Retrieve and use the data during app invoke 2.

To get the most out of Cache:

  1. Store as small amounts of data as possible.

  2. Try to stay below 100kb.

Task apps don’t get a Cache parameter as they aren’t meant to share the data between invokes.

5.1. Store and load

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.store(key='key', value='')  (1)
    assert cache.load(key='key') == ''  (2)

    cache.store(key='key', value=0)  (3)
    assert cache.load(key='key') == '0'  (4)

    cache.store(key='key', value=0.0)  (5)
    assert cache.load(key='key') == '0.0'  (4)

    cache.store(key='key', value=b'')  (6)
    assert cache.load(key='key') == ''  (4)

    cache.store(mapping={'key': 'val', 'other-key': 'other-val'})  (7)
    assert cache.load_all() == {'key': 'val', 'other-key': 'other-val'}  (8)
1 Cache can store str values.
2 Load the value using its key.
3 Cache can store int values.
4 Cache stores the values in binary format. The data is decoded while loading, but Cache can’t assume the type of the value. That’s why the returned type is str.
5 Cache can store float values.
6 Cache can store bytes values.
7 Store multiple values at once using mapping parameter.
8 Load all data from cache.

5.2. Delete

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.store(mapping={'key1': 'val1', 'key2': 'val2', 'key3': 'val3'})  (1)

    cache.delete(keys=['key1'])  (2)
    assert cache.load_all() == {'key2': 'val2', 'key3': 'val3'}  (3)

    cache.delete_all()  (4)
    assert cache.load_all() == {}  (5)
1 Store some data.
2 Delete specific key.
3 Deleted key is not present in cache.
4 Delete all keys.
5 Cache is empty.

5.3. Expiry, ttl and exists

By default Cache sets an expiry to 60 days.
import datetime
import time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    cache.store(key='key', value='val', expiry=60)  (1)
    cache.store(key='key', value='val', expiry=datetime.timedelta(seconds=60))  (2)

    assert cache.ttl() == 60  (3)
    assert cache.pttl() == 60000  (4)
    assert cache.exists()  (5)

    time.sleep(60)  (6)

    assert not cache.exists()  (7)
1 Store the value and set an expiry to 60 seconds using expiry parameter.
2 expiry parameter also supports datetime.timedelta objects.
3 Get the remaining time of key expiry in seconds.
4 Get the remaining time of key expiry in milliseconds.
5 Verify, that cache contains some data.
6 Wait for 60 seconds for data to expire.
7 Verify, that cache is empty.

6. Logging

As apps are executed very frequently (once a second or so), unlimited logging can lead to huge amounts of data. corva-sdk provides a Logger object, which is a safe way for app logging.

The Logger is a logging.Logger instance and should be used like every other Python logger.

The Logger has following features:

  • Log messages are injected with contextual information, which makes it easy to filter through logs while debugging issues.

  • Log messages have limited length. Too long messages are truncated to not exceed the limit. Max message size can be controlled by LOG_THRESHOLD_MESSAGE_SIZE env variable. Default value is 1000 symbols or bytes.

  • Number of log messages is limited. After reaching the limit logging gets disabled. Number of log messages can be controlled by LOG_THRESHOLD_MESSAGE_COUNT env variable. Default value is 15 messages.

  • Logging level can be set using LOG_LEVEL env variable. Default value is INFO, see Python log levels for other available options.

from corva import Api, Logger, TaskEvent, task  (1)


@task
def task_app(event: TaskEvent, api: Api):
    (2)
    Logger.debug('Debug message!')
    Logger.info('Info message!')
    Logger.warning('Warning message!')
    Logger.error('Error message!')
    try:
        0 / 0
    except ZeroDivisionError:
        Logger.exception('Exception message!')
1 Import Logger object.
2 Use Logger as every other Python logger.

6.1. Customizations

You might want to send logs to other places (e.g., to error reporting systems like Sentry or Rollbar). This can be achieved by providing an instance of logging handler as an argument to app decorator. Custom handler will be used alongside corva-sdk's default one.

import logging  (1)

from corva import Api, Logger, TaskEvent, task

stream_handler = logging.StreamHandler()  (2)


@task(handler=stream_handler)  (3)
def task_app(event: TaskEvent, api: Api):
    Logger.info('Info message!')  (4)
1 Import the module which contains the handler that we want to use.
2 Initialize the handler.
3 Pass the handler as a keyword argument to the app decorator.
4 Logs will be sent to both stream_handler and corva-sdk's default one.

6.1.1. Sentry

pip install sentry-sdk (1)
1 Install the library.
import sentry_sdk  (1)

from corva import Api, TaskEvent, task

sentry_sdk.init("YOUR_SENTRY_DSN")  (2)


@task
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (3)
1 Import Sentry SDK.
2 Initialize the library.
3 All errors will be reported to Sentry now.

6.1.2. Rollbar

pip install rollbar (1)
1 Install the library.
import rollbar.logger  (1)

from corva import Api, TaskEvent, task

rollbar_handler = rollbar.logger.RollbarHandler('YOUR_ROLLBAR_ACCESS_TOKEN')  (2)


@task(handler=rollbar_handler)  (3)
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (4)
1 Import Rollbar SDK.
2 Initialize Rollbar handler.
3 Pass the handler as a keyword argument to the app decorator.
4 All errors will be reported to Rollbar now.

6.1.3. Raygun

pip install raygun4py (1)
1 Install the library.
import raygun4py.raygunprovider  (1)

from corva import Api, TaskEvent, task

raygun_handler = raygun4py.raygunprovider.RaygunHandler('YOUR_RAYGUN_API_KEY')  (2)


@task(handler=raygun_handler)  (3)
def app(event: TaskEvent, api: Api) -> None:
    1 / 0  (4)
1 Import Raygun SDK.
2 Initialize Raygun handler.
3 Pass the handler as a keyword argument to the app decorator.
4 All errors will be reported to Raygun now.

6.1.4. Other libraries

You can use any other error logging libraries. Just initialize and pass corresponding logging handler as a keyword argument to the app decorator. Use code samples above as the examples.

7. Testing

Testing apps is easy and enjoyable.

corva-sdk provides convenient tools for testing through pytest-plugin.

Write your tests using pytest to get the access to the plugin.

To install the library run:

pip install pytest

7.1. Stream

7.1.1. Time

from corva import Api, Cache, StreamTimeEvent, StreamTimeRecord, stream


@stream
def stream_app(event: StreamTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_stream_time_app(app_runner):  (2)
    event = StreamTimeEvent(  (3)
        asset_id=0, company_id=0, records=[StreamTimeRecord(timestamp=0)]
    )

    result = app_runner(stream_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

7.1.2. Depth

from corva import Api, Cache, StreamDepthEvent, StreamDepthRecord, stream


@stream
def stream_app(event: StreamDepthEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_stream_depth_app(app_runner):  (2)
    event = StreamDepthEvent(  (3)
        asset_id=0, company_id=0, records=[StreamDepthRecord(measured_depth=0)]
    )

    result = app_runner(stream_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

7.2. Scheduled

7.2.1. Data Time

from corva import Api, Cache, ScheduledDataTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledDataTimeEvent(
        asset_id=0, start_time=0, end_time=0, company_id=0
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

7.2.2. Depth

from corva import Api, Cache, ScheduledDepthEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledDepthEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledDepthEvent(
        asset_id=0,
        company_id=0,
        top_depth=0.0,
        bottom_depth=1.0,
        log_identifier='',
        interval=1.0,
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

7.2.3. Natural Time

from corva import Api, Cache, ScheduledNaturalTimeEvent, scheduled


@scheduled
def scheduled_app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):  (1)
    return 'Hello, World!'


def test_scheduled_app(app_runner):  (2)
    event = ScheduledNaturalTimeEvent(
        asset_id=0, company_id=0, schedule_start=0, interval=1
    )  (3)

    result = app_runner(scheduled_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

7.3. Task

from corva import Api, TaskEvent, task


@task
def task_app(event: TaskEvent, api: Api):  (1)
    return 'Hello, World!'


def test_task_app(app_runner):  (2)
    event = TaskEvent(asset_id=0, company_id=0)  (3)

    result = app_runner(task_app, event=event)  (4)

    assert result == 'Hello, World!'  (5)
1 Sample app that we want to test.
2 Add app_runner argument to your test function.
3 Define the event that will be passed to the app.
4 Use app_runner fixture to run the app.
5 Verify the result.

8. Development - Contributing

Here are some guidelines to set up your environment.

8.1. Create and activate virtual environment

What’s needed:

  • Python 3.8.

  • Opened terminal inside cloned corva-sdk repository.

python -m venv env (1)
source ./env/bin/activate (2)
1 Create a directory ./env/ with isolated Python environment inside. You will be able to install needed packages there.
2 Activate the new environment.

From now on you must always have activated virtual environment when working with the project.

8.2. Install dependencies and run tests

What’s needed:

  • Installed make.

make install (1)
make all  (2)
1 Install all development requirements.
2 Run tests and linter to verify that the project was set up properly.

8.3. What’s next?

After completing steps above you can explore the project and make a contribution.

make help (1)
1 List available make targets - a good starting point for exploration.