Dataset record's data
Wrapper for aggregate API
See Swagger UI, MongoDB docs
Wrapper for aggregate with pipeline API
See Swagger UI, MongoDB docs
Create new records and get created entries
import { Corva } from '@corva/node-sdk';
export const handler = new Corva().stream(async (event, context) => {
const entries = await context.api
.provider('some-provider')
.dataset('some-collection')
.createEntries([
{
timestamp: 1645387272,
data: {
some: 'data',
},
},
{
timestamp: 1645387273,
data: {
some: 'other-data',
},
},
]);
for (const entry of entries) {
context.logger.info(entry);
}
});
Same as createEntries but additionally will call message producer internally
/* eslint-disable @typescript-eslint/require-await */
import { CollectionRecord, Corva, ScheduledDataTimeEvent, StreamTimeEvent } from '@corva/node-sdk';
export const producer = new Corva().stream(async (event, { api }) => {
return api
.provider('my-provider')
.dataset('quiz-answers')
.createEntriesAndProduceMessages([
{
timestamp: 1,
company_id: 42,
asset_id: event.asset_id,
data: { answer: 10 },
},
{
timestamp: 2,
company_id: 42,
asset_id: event.asset_id,
data: { answer: 11 },
},
]);
});
export const streamConsumer = new Corva().stream(async (event: StreamTimeEvent) => {
// data from the producer - same that was published
expect(event.records.map((record) => record.data)).toEqual([{ answer: 10 }, { answer: 11 }]);
});
export const scheduledConsumer = new Corva().scheduled(async (event: ScheduledDataTimeEvent, context) => {
const searchParams = {
query: { timestamp: { $gte: event.start_time, $lte: event.end_time } },
limit: 10,
sort: { timestamp: 1 },
};
const records: CollectionRecord<{ answer: number }>[] = [];
for await (const record of context.api
.provider('my-provider')
.dataset<{ answer: number }>('quiz-answers')
.search(searchParams)) {
records.concat(record);
}
// data from the producer - same that was published
expect(records.map((record) => record.data)).toEqual([{ answer: 10 }, { answer: 11 }]);
});
Insert new document, returns Entry
Create Entry instance with id or existing object
Optional
idOrRecord: string | CollectionRecord<TCollectionRecordData, unknown>Remove multiple records by query
Convenience methods for dataset