corva-sdk
Documentation for version 1.12.0.
corva-sdk is a framework for building Corva DevCenter Python apps.
2. App Types
There are three app types that you can build:
Each app type can have optional handler
to process partial rerun merge events.
TIP: Use type hints like in examples below
for better support from editors and tools.
2.1. Stream
2.1.1. Time
from corva import Api, Cache, StreamTimeEvent, stream (1)
@stream (3)
def stream_time_app(event: StreamTimeEvent, api: Api, cache: Cache): (2)
return "Hello, World!" (4)
2.2. Scheduled
Scheduled apps can be data time, depth or natural time based.
2.2.1. Data Time
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled (1)
@scheduled (3)
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache): (2)
return 'Hello, World!' (4)
2.2.2. Depth
from corva import Api, Cache, ScheduledDepthEvent, scheduled (1)
@scheduled (3)
def scheduled_app(event: ScheduledDepthEvent, api: Api, cache: Cache): (2)
return 'Hello, World!' (4)
2.3. Task
from corva import Api, TaskEvent, task (1)
@task (3)
def task_app(event: TaskEvent, api: Api): (2)
return 'Hello, World!' (4)
2.4. Partial Rerun Merge Handler (Optional)
from corva import ( (1)
Api,
Cache,
PartialRerunMergeEvent,
StreamTimeEvent,
partial_rerun_merge,
stream,
)
@stream
def stream_app(event: StreamTimeEvent, api: Api, cache: Cache):
return "Handling stream event..."
@partial_rerun_merge (3)
def partial_rerun_app(
event: PartialRerunMergeEvent,
api: Api,
asset_cache: Cache,
rerun_asset_cache: Cache,
): (2)
return "Hello, World!" (4)
3. Event
Event is an object that contains essential data for the app. Each app type has its corresponding event type, which only contains fields relevant to that app.
4. Api
Apps should be able to communicate with
Corva Platform API and Corva Data API.
For this corva-sdk provides an Api
object,
which wraps Python requests
library
and adds automatic authorization, convenient URL usage
and reasonable timeouts to API requests.
Api
methods return requests.Response
objects.
4.1. Url usage
from corva import Api, TaskEvent, task
@task
def task_app(event: TaskEvent, api: Api):
api.get('/v2/pads') (1)
api.get('/api/v1/data/provider/dataset/') (2)
api.get('https://api.corva.ai/v2/pads') (3)
1 | Use Corva Platform API URL suffix to make a Platform API call. |
2 | Use Corva Data API URL suffix to make a Data API call. |
3 | You can also provide full URL. |
4.2. HTTP GET
from corva import Api, TaskEvent, task
@task
def task_app(event: TaskEvent, api: Api):
response = api.get('/v2/pads') (1)
api.get('/v2/pads', params={'company': 1}) (2)
response.json() (3)
1 | Simplest GET example. |
2 | Use optional params parameter
to provide URL query string params. |
3 | You can unpack received data like this,
as all Api methods return requests.Response objects. |
4.3. HTTP POST, DELETE, PUT and PATCH
from corva import Api, TaskEvent, task
@task
def task_app(event: TaskEvent, api: Api):
api.post('/v2/pads', data={'key': 'val'}) (1) (5)
api.delete('/v2/pads/123') (2)
api.put('/api/v1/data/provider/dataset/', data={'key': 'val'}) (3) (5)
api.patch('/v2/pads/123', data={'key': 'val'}) (4) (5)
1 | Simplest POST example. |
2 | Simplest DELETE example. |
3 | Simplest PUT example. |
4 | Simplest PATCH example. |
5 | Use data parameter
to provide a request body,
that will be cast to json. |
4.4. Custom headers and timeouts
from corva import Api, TaskEvent, task
@task
def task_app(event: TaskEvent, api: Api):
api.get('/v2/pads', headers={'header': 'header-value'}) (1)
api.get('/v2/pads', timeout=5) (2)
1 | Use headers parameter
to add custom headers to the request. |
2 | Use timeout parameter
to override default timeout value. |
4.5. Convenience methods
Api
provides some convenience methods
for frequently used endpoints.
4.5.1. Get dataset
Fetch the data from the
/api/v1/data/provider/dataset/ endpoint
using Api.get_dataset
method.
from corva import Api, TaskEvent, task
@task
def task_app(event: TaskEvent, api: Api):
api.get_dataset(
provider='corva',
dataset='wits',
query={
'asset_id': event.asset_id,
},
sort={'timestamp': 1},
limit=1,
fields='data,metadata',
)
4.5.2. Produce messages
Post data to the /api/v1/message_producer/ endpoint
using Api.produce_messages
method.
The method will work for both
stream
and scheduled
types of apps.
from corva import Api, Cache, ScheduledDataTimeEvent, ScheduledDepthEvent, scheduled
@scheduled
def scheduled_time_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
api.produce_messages(data=[{'timestamp': 1}, {'timestamp': 2}]) (1)
@scheduled
def scheduled_depth_app(event: ScheduledDepthEvent, api: Api, cache: Cache):
api.produce_messages(data=[{'measured_depth': 1.0}, {'measured_depth': 2.0}]) (2)
1 | Example of producing time messages. |
2 | Example of producing depth messages. |
4.5.3. Insert data
Post data to the /api/v1/data/provider/dataset/ endpoint.
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
api.insert_data(
provider='my-company-name',
dataset='my-datset-name',
data=[
{
'asset_id': event.asset_id,
'version': 1,
'timestamp': 0,
'data': {'result': 'very important result'},
},
{
'asset_id': event.asset_id,
'version': 1,
'timestamp': 1,
'data': {'result': 'very important result'},
},
], (1)
produce=True, (2)
)
1 | Save two documents to dataset. |
2 | You can enable this flag
to save and produce the data at once. |
4.6. Enabling re-tries
When request fails due to HTTP error, re-trying functionality can be used. It will try to re-send the same request again with exponential back-off for HTTP status codes below:
-
428
-
500
-
502
-
503
-
504
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
api.max_retries = 5 # Enabling up to 5 retries when HTTP error happens.
...
5. Cache
Apps might need to share some data between invokes.
For this corva-sdk provides a Cache
object.
Cache
uses a dict-like database,
so the data is stored as key:value
pairs.
A typical example of Cache
usage is:
-
Store some data during
app invoke 1
. -
Retrieve and use the data during
app invoke 2
.
To get the most out of
|
Task apps don’t get a Cache parameter
as they aren’t meant to share the data between invokes.
|
5.1. Get and set
Cast your data to |
5.1.1. Storing str
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
cache.set(key='str', value='text') (1)
assert cache.get(key='str') == 'text' (2)
1 | Save str data to Cache . |
2 | Load the value using its key. |
5.1.2. Storing int
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
cache.set(key='int', value=str(0)) (1)
assert cache.get('int') == '0' (2)
assert int(cache.get('int')) == 0 (3)
1 | Cast int to str before saving data to Cache . |
2 | Load the value using its key.
Notice that returned value has str type. |
3 | Cast the value back to int as needed. |
5.1.3. Storing dict
import json
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
cache.set(key='json', value=json.dumps({'int': 0, 'str': 'text'})) (1)
assert isinstance(cache.get('json'), str) (2)
assert json.loads(cache.get('json')) == {'int': 0, 'str': 'text'} (3)
1 | Cast dict to JSON str before saving data to Cache . |
2 | Load the value using its key.
Notice that returned value has str type. |
3 | Parse JSON str
and convert it back into a dict . |
5.2. Delete
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
cache.set(key='str', value='text') (1)
assert cache.get(key='str') == 'text' (2)
cache.delete(key='str') (3)
assert cache.get(key='str') is None (4)
1 | Save some data to Cache . |
2 | Load the value using its key. |
3 | Delete the data, when it is no longer needed. |
4 | The data is not present anymore. |
5.3. Key expiry
By default, keys expire in 60 days. |
import time
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
cache.set(key='key', value='value', ttl=1) (1)
assert cache.get('key') == 'value'
time.sleep(1) (2)
assert cache.get('key') is None (3)
1 | Specify how many seconds you want your key to live
using ttl parameter. |
2 | Wait for key to expire. |
3 | The data is not present anymore. |
5.4. Bulk methods
Cache
provides some bulk methods
which make it easy to work with multiple
keys at once.
5.4.1. Get many, get all and set many
import time
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
assert cache.get_all() == {} (1)
cache.set_many( (2)
data=[
('key', 'value_1'),
('key_with_custom_expiry', 'value_2', 1), (3)
]
)
assert cache.get_many( (4)
keys=[
'key',
'non-existent-key', (5)
]
) == {'key': 'value_1', 'non-existent-key': None}
assert cache.get_all() == { (6)
'key': 'value_1',
'key_with_custom_expiry': 'value_2',
}
time.sleep(1) (7)
assert cache.get_all() == {'key': 'value_1'} (8)
1 | Get all the data from the hash. It is empty as we have not stored anything yet. |
2 | Store multiple key-value pairs at once. |
3 | You can set custom key expiry in seconds by providing additional tuple element. |
4 | Get multiple keys at once. |
5 | If you request a non-existent key
it will be assigned a value of None . |
6 | Get all the data from the hash. |
7 | Wait for key with custom expiry to expire. |
8 | The expired key is not present anymore. |
5.4.2. Delete many and delete all
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
assert cache.get_all() == {} (1)
cache.set_many(data=[('k1', 'v1'), ('k2', 'v2'), ('k3', 'v3'), ('k4', 'v4')]) (2)
cache.delete_many(keys=['k1', 'k2']) (3)
assert cache.get_all() == {'k3': 'v3', 'k4': 'v4'} (4)
cache.delete_all() (5)
assert cache.get_all() == {} (6)
1 | Cache is empty as we have not stored anything yet. |
2 | Store some data. |
3 | Delete multiple keys at once. |
4 | Deleted keys are non-existent anymore. |
5 | Delete all the data. |
6 | Cache is empty. |
6. Logging
As apps are executed very frequently
(once a second or so),
unlimited logging can lead to huge amounts of data.
corva-sdk provides a Logger
object,
which is a safe way for app logging.
The Logger
is a logging.Logger
instance
and should be used like every other Python logger.
The Logger
has following features:
-
Log messages are injected with contextual information, which makes it easy to filter through logs while debugging issues.
-
Log messages have limited length. Too long messages are truncated to not exceed the limit. Max message size can be controlled by
LOG_THRESHOLD_MESSAGE_SIZE
env variable. Default value is1000
symbols or bytes. -
Number of log messages is limited. After reaching the limit logging gets disabled. Number of log messages can be controlled by
LOG_THRESHOLD_MESSAGE_COUNT
env variable. Default value is15
messages. -
Logging level can be set using
LOG_LEVEL
env variable. Default value isINFO
, see Python log levels for other available options.
from corva import Api, Logger, TaskEvent, task (1)
@task
def task_app(event: TaskEvent, api: Api):
(2)
Logger.debug('Debug message!')
Logger.info('Info message!')
Logger.warning('Warning message!')
Logger.error('Error message!')
try:
0 / 0
except ZeroDivisionError:
Logger.exception('Exception message!')
1 | Import Logger object. |
2 | Use Logger as every other Python logger. |
6.1. Customizations
You might want to send logs to other places
(e.g., to error reporting systems like
Sentry
or Rollbar
).
This can be achieved by providing an instance of
logging handler
as an argument to app decorator.
Custom handler will be used alongside corva-sdk's default one.
import logging (1)
from corva import Api, Logger, TaskEvent, task
stream_handler = logging.StreamHandler() (2)
@task(handler=stream_handler) (3)
def task_app(event: TaskEvent, api: Api):
Logger.info('Info message!') (4)
1 | Import the module which contains the handler that we want to use. |
2 | Initialize the handler. |
3 | Pass the handler as a keyword argument to the app decorator. |
4 | Logs will be sent to both stream_handler
and corva-sdk's default one. |
6.1.1. Sentry
pip install sentry-sdk (1)
1 | Install the library. |
import sentry_sdk (1)
from corva import Api, TaskEvent, task
sentry_sdk.init("YOUR_SENTRY_DSN") (2)
@task
def app(event: TaskEvent, api: Api) -> None:
1 / 0 (3)
1 | Import Sentry SDK. |
2 | Initialize the library. |
3 | All errors will be reported to Sentry now. |
6.1.2. Rollbar
pip install rollbar (1)
1 | Install the library. |
import rollbar.logger (1)
from corva import Api, TaskEvent, task
rollbar_handler = rollbar.logger.RollbarHandler('YOUR_ROLLBAR_ACCESS_TOKEN') (2)
@task(handler=rollbar_handler) (3)
def app(event: TaskEvent, api: Api) -> None:
1 / 0 (4)
1 | Import Rollbar SDK. |
2 | Initialize Rollbar handler. |
3 | Pass the handler as a keyword argument to the app decorator. |
4 | All errors will be reported to Rollbar now. |
6.1.3. Raygun
pip install raygun4py (1)
1 | Install the library. |
import raygun4py.raygunprovider (1)
from corva import Api, TaskEvent, task
raygun_handler = raygun4py.raygunprovider.RaygunHandler('YOUR_RAYGUN_API_KEY') (2)
@task(handler=raygun_handler) (3)
def app(event: TaskEvent, api: Api) -> None:
1 / 0 (4)
1 | Import Raygun SDK. |
2 | Initialize Raygun handler. |
3 | Pass the handler as a keyword argument to the app decorator. |
4 | All errors will be reported to Raygun now. |
7. Merging incoming events
Sometimes Corva can send more than one event to scheduled
and stream
apps.
Optionally we can ask to merge them into one event by providing merge_events=True
parameter.
from corva import Api, Cache, StreamTimeEvent, stream
# imagine we actually have 3 incoming events with 3 records each
@stream(merge_events=True)
def app(event: StreamTimeEvent, api: Api, cache: Cache):
# since we passed merge_events=True all 3 incoming events
# and their records will be merged into a single event with 9 records
assert len(event.records) == 9 # this will not fail
return event
from corva import Api, Cache, ScheduledNaturalTimeEvent, scheduled
@scheduled(merge_events=True)
def app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):
return event
Usually this is needed to save some IO operations by processing data in bigger batches.
Use this parameter with care, in pessimistic scenario you can receive too much data, try to
process it in "one go" and fail with timeout. In that case your app will be automatically
restarted and you will start from the beginning and fail again.
Without this parameter after each processed event corva-sdk will "remember" that event was processed.
So, for example, if you will fail at event #5 and your app will be restarted - app will start processing
from event #5(and not #1 like in case of merge_events=True
)
8. Followable apps
An app is called followable once it can be followed by other ones. Followable apps must produce data to trigger chain reaction of following app runs. See Produce messages and Insert data sections for instructions on how to produce messages.
from corva import Api, Cache, ScheduledDataTimeEvent, StreamTimeEvent, scheduled, stream
@scheduled (1)
def followable_scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
data = [
{
'asset_id': event.asset_id,
'version': 1,
'timestamp': 0,
'data': {'answer': 10},
},
{
'asset_id': event.asset_id,
'version': 1,
'timestamp': 60,
'data': {'answer': 11},
},
] (2)
(3)
api.insert_data(
provider='my-provider',
dataset='quiz-answers',
data=data,
) (4)
api.produce_messages(data=data) (5)
(6)
api.insert_data(
provider='my-provider',
dataset='quiz-answers',
data=data,
produce=True, (7)
)
@scheduled (8)
def following_scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
data = api.get_dataset(
provider='my-provider',
dataset='quiz-answers',
query={'asset_id': event.asset_id, 'company_id': event.company_id},
sort={'timestamp': 1},
limit=2,
) (9)
assert [datum['data'] for datum in data] == [
{'answer': 10},
{'answer': 11},
]
@stream (10)
def following_stream_app(event: StreamTimeEvent, api: Api, cache: Cache):
assert [record.data for record in event.records] == [
{'answer': 10},
{'answer': 11},
] (11)
1 | The followable app. |
2 | Data documents that should be saved and produced. |
3 | Slow method which uses two api calls. |
4 | First save the data to the dataset. See Insert data for details. |
5 | Then produce the messages. See Produce messages for details. |
6 | Recommended method which uses single api call. |
7 | Enable this flag to save and produce messages at once. See Insert data for details. |
8 | Example scheduled following app. It will be invoked as soon as there is some time interval worth of data. |
9 | Scheduled apps should query the data as unlike stream ones they don’t receive it in the event. |
10 | Example stream following app. It will be invoked as soon as new messages are produced. |
11 | Stream apps receive data straight from the event. No api calls needed. |
9. Secrets
You can store sensitive data such as passwords,
tokens,
confidential configuration data,
or any other types of data that you want to protect
as application secrets.
To access such data inside the app
corva-sdk provides secrets
object.
See testing secrets
section
to see how to test an app
that uses secrets.
from corva import Api, TaskEvent, secrets, task (1)
@task
def task_app(event: TaskEvent, api: Api):
secrets['api_token'] (2)
int(secrets['integer']) (3)
1 | Import secrets object. |
2 | secrets is a dictionary so
use key to retrieve the value. |
3 | Values are stored as strings, cast the value to required type as needed. Example shows how to get the integer. |
10. Testing
Testing apps is easy and enjoyable.
corva-sdk provides convenient tools for testing through pytest-plugin.
Write your tests using pytest
to get the access to the plugin.
To install the library run:
pip install pytest
10.1. Stream
10.1.1. Time
from corva import Api, Cache, StreamTimeEvent, StreamTimeRecord, stream
@stream
def stream_app(event: StreamTimeEvent, api: Api, cache: Cache): (1)
return 'Hello, World!'
def test_stream_time_app(app_runner): (2)
event = StreamTimeEvent( (3)
asset_id=0, company_id=0, records=[StreamTimeRecord(timestamp=0)]
)
result = app_runner(stream_app, event=event) (4)
assert result == 'Hello, World!' (5)
1 | Sample app that we want to test. |
2 | Add app_runner argument to your test function. |
3 | Define the event that will be passed to the app. |
4 | Use app_runner fixture to run the app. |
5 | Verify the result. |
10.1.2. Depth
from corva import Api, Cache, StreamDepthEvent, StreamDepthRecord, stream
@stream
def stream_app(event: StreamDepthEvent, api: Api, cache: Cache): (1)
return 'Hello, World!'
def test_stream_depth_app(app_runner): (2)
event = StreamDepthEvent( (3)
asset_id=0, company_id=0, records=[StreamDepthRecord(measured_depth=0)]
)
result = app_runner(stream_app, event=event) (4)
assert result == 'Hello, World!' (5)
1 | Sample app that we want to test. |
2 | Add app_runner argument to your test function. |
3 | Define the event that will be passed to the app. |
4 | Use app_runner fixture to run the app. |
5 | Verify the result. |
10.2. Scheduled
10.2.1. Data Time
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache): (1)
return 'Hello, World!'
def test_scheduled_app(app_runner): (2)
event = ScheduledDataTimeEvent(
asset_id=0, start_time=0, end_time=0, company_id=0
) (3)
result = app_runner(scheduled_app, event=event) (4)
assert result == 'Hello, World!' (5)
1 | Sample app that we want to test. |
2 | Add app_runner argument to your test function. |
3 | Define the event that will be passed to the app. |
4 | Use app_runner fixture to run the app. |
5 | Verify the result. |
10.2.2. Depth
from corva import Api, Cache, ScheduledDepthEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDepthEvent, api: Api, cache: Cache): (1)
return 'Hello, World!'
def test_scheduled_app(app_runner): (2)
event = ScheduledDepthEvent(
asset_id=0,
company_id=0,
top_depth=0.0,
bottom_depth=1.0,
log_identifier='',
interval=1.0,
) (3)
result = app_runner(scheduled_app, event=event) (4)
assert result == 'Hello, World!' (5)
1 | Sample app that we want to test. |
2 | Add app_runner argument to your test function. |
3 | Define the event that will be passed to the app. |
4 | Use app_runner fixture to run the app. |
5 | Verify the result. |
10.2.3. Natural Time
from corva import Api, Cache, ScheduledNaturalTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache): (1)
return 'Hello, World!'
def test_scheduled_app(app_runner): (2)
event = ScheduledNaturalTimeEvent(
asset_id=0, company_id=0, schedule_start=0, interval=1
) (3)
result = app_runner(scheduled_app, event=event) (4)
assert result == 'Hello, World!' (5)
1 | Sample app that we want to test. |
2 | Add app_runner argument to your test function. |
3 | Define the event that will be passed to the app. |
4 | Use app_runner fixture to run the app. |
5 | Verify the result. |
10.3. Task
from corva import Api, TaskEvent, task
@task
def task_app(event: TaskEvent, api: Api): (1)
return 'Hello, World!'
def test_task_app(app_runner): (2)
event = TaskEvent(asset_id=0, company_id=0) (3)
result = app_runner(task_app, event=event) (4)
assert result == 'Hello, World!' (5)
1 | Sample app that we want to test. |
2 | Add app_runner argument to your test function. |
3 | Define the event that will be passed to the app. |
4 | Use app_runner fixture to run the app. |
5 | Verify the result. |
10.4. Secrets
This section shows how to test an app that uses secrets.
from corva import Api, TaskEvent, secrets, task
@task
def task_app(event: TaskEvent, api: Api): (1)
api_token = secrets['api_token']
return api_token
def test_task_app(app_runner): (2)
event = TaskEvent(asset_id=0, company_id=0) (3)
api_token = app_runner(task_app, event, secrets={'api_token': '12345'}) (4)
assert api_token == '12345' (5)
1 | Sample app that we want to test. |
2 | Add app_runner argument to your test function. |
3 | Define the event that will be passed to the app. |
4 | Use app_runner fixture to run the app.
Pass dictionary with required secrets
as secrets parameter. |
5 | Verify the result. |
10.5. Cache
This section shows how to test an app that uses cache; reuse the same cache or reset it.
from corva import ScheduledDataTimeEvent, scheduled
from corva.service.cache_sdk import UserRedisSdk
@scheduled
def scheduled_fibonacci(event, api, cache): (1)
number1 = int(cache.get('number1') or 1)
number2 = int(cache.get('number2') or 1)
number3 = number1 + number2
cache.set('number1', number2)
cache.set('number2', number3)
return number3
def test_reset_cache(app_runner): (2)
event = ScheduledDataTimeEvent(
asset_id=0, company_id=0, start_time=0, end_time=0
) (3)
for _ in range(5):
result = app_runner(scheduled_fibonacci, event) (4)
assert result == 2 (5)
def test_reuse_cache(app_runner): (6)
event = ScheduledDataTimeEvent(
asset_id=0, company_id=0, start_time=0, end_time=0
) (7)
cache = UserRedisSdk(
hash_name='hash_name', redis_dsn='redis://localhost', use_fakes=True
) (8)
expected_results = [2, 3, 5, 8, 13, 21, 34, 55]
for expected_result in expected_results:
result = app_runner(scheduled_fibonacci, event, cache=cache) (9)
assert result == expected_result (10)
1 | Sample app that we want to test. It is a sample Fibonacci function that is relying on cache data; it receives two numbers from the cache, compute the sum and store the last two numbers in the cache. |
2 | Add app_runner argument to your test function.
In this test case, the cache is reset for each app_runner call. |
3 | Define the event that will be passed to the app. |
4 | Use app_runner fixture to run the app.
When the cache kwargs is not passed to the app_runner ,
it will reset everytime it is called. |
5 | Verify the result.
Since the cache is reset for each app_runner call, the results are the same. |
6 | Add app_runner argument to your test function.
In this test case, the cache is reused for each app_runner call. |
7 | Define the event that will be passed to the app. |
8 | Defining a cache object. |
9 | Use app_runner fixture to run the app.
When the cache is passed in the app_runner ,
it will be used to run the app. |
10 | Verify the result. Since the same cache object is used, the results are changing. |
11. App rerun
You might want to implement custom logic for app rerun,
for this events include rerun
field
that stores rerun metadata.
Example usage scenario of rerun
field:
-
Well start 22/01/31 - end 22/03/31.
-
Rerun start 22/02/02 - end 22/02/04.
-
By default app uses the latest calculated record as the base to catch up to real-time.
-
During rerun app must use the latest record before 22/02/02 (using
rerun.start
) as the base for the initial calculation.
12. Development - Contributing
Here are some guidelines to set up your environment.
12.1. Create and activate virtual environment
What’s needed:
-
Python 3.8.
-
Opened terminal inside cloned corva-sdk repository.
python -m venv env (1)
source ./env/bin/activate (2)
1 | Create a directory ./env/
with isolated Python environment inside.
You will be able to install needed packages there. |
2 | Activate the new environment. |
From now on you must always have activated virtual environment when working with the project. |