fn assert_iterable #
fn assert_iterable(mut ctx context.Context, mut iterable Iterable, assertions ...RxAssert)
assert_iterable asserts the result of an iterable against a list of assertions.
fn assert_single #
fn assert_single(mut ctx context.Context, mut iterable Single, assertions ...RxAssert)
assert_single asserts the result of an iterable against a list of assertions.
fn create #
fn create(f []Producer, opts ...RxOption) Observable
create creates an Observable from scratch by calling the passed function observer methods programatically.
fn custom_predicate #
fn custom_predicate(predicate AssertPredicate) RxAssert
custom_predicate checks a custom predicate.
fn default_error_func_operator #
fn default_error_func_operator(mut ctx context.Context, item Item, dst chan Item, operator_options OperatorOptions)
fn defer_obs #
fn defer_obs(f []Producer, opts ...RxOption) Observable
defer_obs does not create the Observable until the observer subscribes, and creates a fresh Observable for each observer.
fn empty #
fn empty() Observable
empty creates an Observable with no item and terminate immediately.
fn from_channel #
fn from_channel(next chan Item, opts ...RxOption) Observable
from_channel creates a cold observable from a channel.
fn from_error #
fn from_error(err IError) Item
from_error creates an item from an error
fn from_event_source #
fn from_event_source(next chan Item, opts ...RxOption) Observable
from_event_source creates a hot observable from an event source.
fn has_an_error #
fn has_an_error() RxAssert
has_an_error checks that the observable has produce an error.
fn has_error #
fn has_error(err IError) RxAssert
has_error checks that the observable has produce a specific error.
fn has_errors #
fn has_errors(errs ...IError) RxAssert
has_errors checks that the observable has produce a set of errors.
fn has_item #
fn has_item(i ItemValue) RxAssert
has_item checks if a single or optional single has a specific item.
fn has_items #
fn has_items(items ...ItemValue) RxAssert
has_items checks that the observable produces the corresponding items.
fn has_items_no_order #
fn has_items_no_order(items ...ItemValue) RxAssert
has_items_no_order checks that an observable produces the corresponding items regardless of the order.
fn has_no_error #
fn has_no_error() RxAssert
has_no_error checks that the observable has not raised any error.
fn is_empty #
fn is_empty() RxAssert
is_empty checks that the observable has not produce any item.
fn is_not_empty #
fn is_not_empty() RxAssert
is_not_empty checks that the observable produces some items.
fn just #
fn just(items ...ItemValue) fn (opts ...RxOption) Observable
just creates an Observable with the provided items.
fn just_item #
fn just_item(item ItemValue) fn (opts ...RxOption) Observable
just_item creates an Observable with the provided item.
fn new_assertion #
fn new_assertion(f AssertApplyFn) RxAssert
fn new_illegal_input_error #
fn new_illegal_input_error(msg string) IError
fn new_index_out_of_bound_error #
fn new_index_out_of_bound_error(msg string) IError
fn of #
fn of(value ItemValue) Item
of creates an item from a value
fn range #
fn range(start int, count int, opts ...RxOption) Observable
range creates an Observable that emits a range of sequential integers.
fn send_items #
fn send_items(mut ctx context.Context, ch chan Item, strategy CloseChannelStrategy, items []ItemValue)
send_items is a helper function to send items to a channel.
fn serialize #
fn serialize(identifier SerializedFn) RxOption
serialize forces an Observable to make serialized calls and to be well-behaved.
fn thrown #
fn thrown(err IError) Observable
thrown creates an Observable that emits no items and terminates with an error.
fn with_back_pressure_strategy #
fn with_back_pressure_strategy(strategy BackpressureStrategy) RxOption
with_back_pressure_strategy sets the back pressure strategy: drop or block.
fn with_buffered_channel #
fn with_buffered_channel(capacity int) RxOption
with_buffered_channel allows to configure the capacity of a buffered channel.
fn with_context #
fn with_context(mut ctx context.Context) RxOption
with_context allows to pass a context.
fn with_cpu_pool #
fn with_cpu_pool() RxOption
with_cpu_pool allows to specify an execution pool based on the number of logical CPUs.
fn with_error_strategy #
fn with_error_strategy(strategy OnErrorStrategy) RxOption
with_error_strategy defines how an observable should deal with_ error.
This strategy is propagated to the parent observable.
fn with_observation_strategy #
fn with_observation_strategy(strategy ObservationStrategy) RxOption
with_observation_strategy uses the eager observation mode meaning consuming the items even with_out subscription.
fn with_pool #
fn with_pool(pool int) RxOption
with_pool allows to specify an execution pool.
fn with_publish_strategy #
fn with_publish_strategy() RxOption
with_publish_strategy converts an ordinary Observable into a connectable Observable.
interface Duration #
interface Duration {
duration() time.Duration
}
Duration represents a duration
interface ItemValue #
interface ItemValue {}
ItemValue is a type that can be used as a value in a reactive expression.
interface Iterable #
interface Iterable {
mut:
observe(opts ...RxOption) chan Item
}
Iterable is the basic type that can be observed
interface Observable #
interface Observable {
Iterable
mut:
all(predicate Predicate, opts ...RxOption) Single
average_f32(opts ...RxOption) Single
average_f64(opts ...RxOption) Single
average_int(opts ...RxOption) Single
// average_i8(opts ...RxOption) Single
average_i16(opts ...RxOption) Single
average_i64(opts ...RxOption) Single
// // back_off_retry(back_off_cfg backoff.BackOff, opts ...RxOption) Observable
// buffer_with_count(count int, opts ...RxOption) Observable
// buffer_with_time(timespan Duration, opts ...RxOption) Observable
// buffer_with_time_or_count(timespan Duration, count int, opts ...RxOption) Observable
// connect(mut ctx context.Context) (context.Context, context.CancelFn)
// contains(equal Predicate, opts ...RxOption) Single
// count(opts ...RxOption) Single
// debounce(timespan Duration, opts ...RxOption) Observable
// default_if_empty(default_value ItemValue, opts ...RxOption) Observable
// distinct(apply Func, opts ...RxOption) Observable
// distinct_until_changed(apply Func, opts ...RxOption) Observable
// do_on_completed(completed_func CompletedFunc, opts ...RxOption) chan int
// do_on_error(err_func ErrFunc, opts ...RxOption) chan int
// do_on_next(next_func NextFunc, opts ...RxOption) chan int
// element_at(index u32, opts ...RxOption) Single
// error(opts ...RxOption) IError
// errors(opts ...RxOption) []IError
// filter(apply Predicate, opts ...RxOption) Observable
// find(find Predicate, opts ...RxOption) OptionalSingle
// first(opts ...RxOption) OptionalSingle
// first_or_default(default_value ItemValue, opts ...RxOption) Single
// flat_map(apply ItemToObservable, opts ...RxOption) Observable
// for_each(next_func NextFunc, err_func ErrFunc, completed_func CompletedFunc, opts ...RxOption) chan int
// group_by(length int, distribution DistributionFn, opts ...RxOption) Observable
// group_by_dynamic(distribution DistributionStrFn, opts ...RxOption) Observable
// ignore_elements(opts ...RxOption) Observable
// join(joiner Func2, mut right Observable, time_extractor TimeExtractorFn, window Duration, opts ...RxOption) Observable
// last(opts ...RxOption) OptionalSingle
// last_or_default(default_value ItemValue, opts ...RxOption) Single
// map(apply Func, opts ...RxOption) Observable
// marshal(marshaller Marshaller, opts ...RxOption) Observable
// max(comparator Comparator, opts ...RxOption) OptionalSingle
// min(comparator Comparator, opts ...RxOption) OptionalSingle
// on_error_resume_next(resume_sequence ErrorToObservable, opts ...RxOption) Observable
// on_error_return(resume_func ErrorFunc, opts ...RxOption) Observable
// on_error_return_item(resume ItemValue, opts ...RxOption) Observable
// reduce(apply Func2, opts ...RxOption) OptionalSingle
// repeat(count i64, frequency Duration, opts ...RxOption) Observable
// retry(count int, should_retry RetryFn, opts ...RxOption) Observable
// run(opts ...RxOption) chan int
// sample(mut iterable Iterable, opts ...RxOption) Observable
// scan(apply Func2, opts ...RxOption) Observable
// sequence_equal(mut iterable Iterable, opts ...RxOption) Single
// send(output chan Item, opts ...RxOption)
// serialize(from context.Context, identifier IdentifierFn, opts ...RxOption) Observable
// skip(nth u32, opts ...RxOption) Observable
// skip_last(nth u32, opts ...RxOption) Observable
// skip_while(apply Predicate, opts ...RxOption) Observable
// start_with(mut iterable Iterable, opts ...RxOption) Observable
// sum_f32(opts ...RxOption) OptionalSingle
// sum_f64(opts ...RxOption) OptionalSingle
// sum_i64(opts ...RxOption) OptionalSingle
// take(nth u32, opts ...RxOption) Observable
// take_last(nth u32, opts ...RxOption) Observable
// take_until(apply Predicate, opts ...RxOption) Observable
// take_while(apply Predicate, opts ...RxOption) Observable
// time_interval(opts ...RxOption) Observable
// timestamp(opts ...RxOption) Observable
// to_map(key_selector Func, opts ...RxOption) Single
// to_map_with_value_selector(key_selector Func, valueSelector Func, opts ...RxOption) Single
// to_slice(initial_capacity int, opts ...RxOption) ?[]ItemValue
// unmarshal(unmarshaller Unmarshaller, factory FactoryFn, opts ...RxOption) Observable
// window_with_count(count int, opts ...RxOption) Observable
// window_with_time(timespan Duration, opts ...RxOption) Observable
// window_with_time_or_count(timespan Duration, count int, opts ...RxOption) Observable
// zip_from_iterable(mut iterable Iterable, zipper Func2, opts ...RxOption) Observable
}
Observable is the standard interface for Observables.
interface OptionalSingle #
interface OptionalSingle {
Iterable
mut:
get(opts ...RxOption) ?Item
map(apply Func, opts ...RxOption) Single
run(opts ...RxOption) chan int
}
OptionalSingle is an optional single
interface RxOption #
interface RxOption {
apply(mut fdo FuncOption)
to_propagate() bool
is_eager_observation() bool
get_pool() ?int
build_channel() chan Item
build_context(parent context.Context) context.Context
get_back_pressure_strategy() BackpressureStrategy
get_error_strategy() OnErrorStrategy
is_connectable() bool
is_connect_operation() bool
is_serialized() ?SerializedFn
}
Options handles configurable options
interface Single #
interface Single {
Iterable
mut:
observe(opts ...RxOption) chan Item
filter(apply Predicate, opts ...RxOption) OptionalSingle
get(opts ...RxOption) ?Item
map(apply Func, opts ...RxOption) Single
run(opts ...RxOption) chan int
}
Single is an observable with a single element
type AssertApplyFn #
type AssertApplyFn = fn (mut do RxAssert)
AssertApplyFn is a custom function to apply modifications to a RxAssert.
type AssertPredicate #
type AssertPredicate = fn (items []ItemValue) ?
AssertPredicate is a custom predicate based on the items.
fn (ChannelIterable) observe #
fn (mut i ChannelIterable) observe(opts ...RxOption) chan Item
type Comparator #
type Comparator = fn (a ItemValue, b ItemValue) int
Comparator defines a func that returns an int:
- 0 if two elements are equals
- A negative value if the first argument is less than the second
- A positive value if the first argument is greater than the second
type CompletedFunc #
type CompletedFunc = fn ()
CompletedFunc handles the end of a stream.
fn (CreateIterable) observe #
fn (mut i CreateIterable) observe(opts ...RxOption) chan Item
fn (DeferIterable) observe #
fn (i &DeferIterable) observe(opts ...RxOption) chan Item
type Disposable #
type Disposable = fn ()
Disposable is a function to be called in order to dispose a subscription.
pub type Disposable = context.CancelFn
type DistributionFn #
type DistributionFn = fn (item Item) int
type DistributionStrFn #
type DistributionStrFn = fn (item Item) string
type ErrFunc #
type ErrFunc = fn (err IError)
ErrFunc handles an string in a stream.
type ErrorFunc #
type ErrorFunc = fn (err IError) ItemValue
ErrorFunc defines a function that computes a value from an string.
type ErrorToObservable #
type ErrorToObservable = fn (err IError) Observable
ErrorToObservable defines a function that transforms an observable from an error.
type FactoryFn #
type FactoryFn = fn (opts ...RxOption) chan Item
fn (FactoryIterable) observe #
fn (i &FactoryIterable) observe(opts ...RxOption) chan Item
type Func #
type Func = fn (mut ctx context.Context, arg ItemValue) ?ItemValue
Func defines a function that computes a value from an input value.
type Func2 #
type Func2 = fn (mut ctx context.Context, a ItemValue, b ItemValue) ?ItemValue
Func2 defines a function that computes a value from two input values.
type FuncN #
type FuncN = fn (args ...ItemValue) ItemValue
FuncN defines a function that computes a value from N input values.
fn (FuncOption) str #
fn (o FuncOption) str() string
type IdentifierFn #
type IdentifierFn = fn (value ItemValue) int
type ItemToObservable #
type ItemToObservable = fn (item Item) Observable
ItemToObservable defines a function that computes an observable from an item.
type IterableFactoryFn #
type IterableFactoryFn = fn (mut ctx context.Context, next chan Item, option RxOption, opts ...RxOption)
fn (JustIterable) observe #
fn (i &JustIterable) observe(opts ...RxOption) chan Item
type Marshaller #
type Marshaller = fn (value ItemValue) ?[]byte
Marshaller defines a marshaller type (ItemValue to []byte).
type NextFunc #
type NextFunc = fn (arg ItemValue)
NextFunc handles a next item in a stream.
type Predicate #
type Predicate = fn (value ItemValue) bool
Predicate defines a func that returns a bool from an input value.
type Producer #
type Producer = fn (mut ctx context.Context, next chan Item)
Producer defines a producer implementation.
fn (RangeIterable) observe #
fn (i &RangeIterable) observe(opts ...RxOption) chan Item
type RetryFn #
type RetryFn = fn (err IError) bool
fn (SliceIterable) observe #
fn (i &SliceIterable) observe(opts ...RxOption) chan Item
type Supplier #
type Supplier = fn (mut ctx context.Context) Item
Supplier defines a function that supplies a result from nothing.
type TimeExtractorFn #
type TimeExtractorFn = fn (value ItemValue) time.Time
type Unmarshaller #
type Unmarshaller = fn ([]byte, ItemValue) ItemValue
Unmarshaller defines an unmarshaller type ([]byte to interface).
enum BackpressureStrategy #
enum BackpressureStrategy {
// block blocks until the channel is available.
block
// drop drops the message.
drop
}
BackpressureStrategy is the backpressure strategy type.
enum CloseChannelStrategy #
enum CloseChannelStrategy {
// leave_channel_open indicates to leave the channel open after completion.
leave_channel_open
// close_channel indicates to close the channel open after completion.
close_channel
}
CloseChannelStrategy indicates a strategy on whether to close a channel.
enum ObservationStrategy #
enum ObservationStrategy {
// lazy is the default observation strategy, when an Observer subscribes.
lazy
// eager means consuming as soon as the Observable is created.
eager
}
ObservationStrategy defines the strategy to consume from an Observable.
enum OnErrorStrategy #
enum OnErrorStrategy {
// stop_on_error is the default error strategy.
// An operator will stop processing items on error.
stop_on_error
// continue_on_error means an operator will continue processing items after an error.
continue_on_error
}
OnErrorStrategy is the Observable error strategy.
struct IllegalInputError #
struct IllegalInputError {
pub:
msg string
code int
}
IllegalInputError is triggered when the observable receives an illegal input
fn (IllegalInputError) msg #
fn (e IllegalInputError) msg() string
msg returns the error message
fn (IllegalInputError) code #
fn (e IllegalInputError) code() int
code returns the error code
struct IndexOutOfBoundError #
struct IndexOutOfBoundError {
pub:
msg string
code int
}
IndexOutOfBoundError is triggered when the observable cannot access to the specified index
fn (IndexOutOfBoundError) msg #
fn (e IndexOutOfBoundError) msg() string
msg returns the error message
fn (IndexOutOfBoundError) code #
fn (e IndexOutOfBoundError) code() int
code returns the error code
struct Item #
struct Item {
pub:
value ItemValue|none = none
err IError|none = none
}
Item is a wrapper around a value that can be used as a value in a reactive expression. It is a reference type, so it can be used to wrap values that are not reference types.
fn (Item) is_error #
fn (i Item) is_error() bool
is_error checks if an item is an error
fn (Item) send_blocking #
fn (i Item) send_blocking(ch chan Item)
send_blocking sends an item and blocks until it is sent
fn (Item) send_context #
fn (i Item) send_context(mut ctx context.Context, ch chan Item) bool
send_context sends an item and blocks until it is sent or a context canceled.
It returns a boolean to indicate wheter the item was sent.
fn (Item) send_non_blocking #
fn (i Item) send_non_blocking(ch chan Item) bool
send_non_blocking sends an item without blocking.
It returns a boolean to indicate whether the item was sent.
struct ObservableImpl #
struct ObservableImpl {
mut:
iterable Iterable
parent context.Context = context.background()
}
ObservableImpl implements Observable.
fn (ObservableImpl) all #
fn (mut o ObservableImpl) all(predicate Predicate, opts ...RxOption) Single
all determines whether all items emitted by an Observable meet some criteria
fn (ObservableImpl) average_f32 #
fn (mut o ObservableImpl) average_f32(opts ...RxOption) Single
average_f32 calculates the average of numbers emitted by an Observable and emits the average f32
fn (ObservableImpl) average_f64 #
fn (mut o ObservableImpl) average_f64(opts ...RxOption) Single
average_f64 calculates the average of numbers emitted by an Observable and emits the average f64
fn (ObservableImpl) average_i16 #
fn (mut o ObservableImpl) average_i16(opts ...RxOption) Single
average_i16 calculates the average of numbers emitted by an Observable and emits the average i16
fn (ObservableImpl) average_i64 #
fn (mut o ObservableImpl) average_i64(opts ...RxOption) Single
average_i64 calculates the average of numbers emitted by an Observable and emits the average i64
fn (ObservableImpl) average_int #
fn (mut o ObservableImpl) average_int(opts ...RxOption) Single
average_int calculates the average of numbers emitted by an Observable and emits the average int
fn (ObservableImpl) observe #
fn (mut o ObservableImpl) observe(opts ...RxOption) chan Item
Observe observes an Observable by returning its channel.
fn (ObservableImpl) str #
fn (o ObservableImpl) str() string
str returns a string representation of the Observable.
struct OperatorOptions #
struct OperatorOptions {
stop fn ()
reset_iterable fn (mut Iterable)
}
struct OptionalSingleImpl #
struct OptionalSingleImpl {
mut:
iterable Iterable
parent context.Context
}
OptionalSingleImpl is the default implementation for OptionalSingle
fn (OptionalSingleImpl) str #
fn (o OptionalSingleImpl) str() string
fn (OptionalSingleImpl) get #
fn (mut o OptionalSingleImpl) get(opts ...RxOption) ?Item
get returns the item or rxv.optional_empty. The error returned is if the context has been canceled.
this method is blocking.
fn (OptionalSingleImpl) map #
fn (mut o OptionalSingleImpl) map(apply Func, opts ...RxOption) Single
map transforms the items emitted by an optional_single by applying a function to each item
fn (OptionalSingleImpl) run #
fn (mut o OptionalSingleImpl) run(opts ...RxOption) chan int
run creates an observer without consuming the emitted items
struct RxAssert #
struct RxAssert {
f AssertApplyFn = unsafe { nil }
mut:
check_has_items bool
check_has_no_items bool
check_has_some_items bool
items []ItemValue
check_has_items_no_order bool
items_no_order []ItemValue
check_has_raised_error bool
check_has_raised_errors bool
errs []IError
check_has_raised_an_error bool
check_has_not_raised_error bool
check_has_item bool
item ItemValue
check_has_no_item bool
check_has_custom_predicate bool
custom_predicates []AssertPredicate
}
RxAssert is a structure to apply assertions to an observable.
struct SingleImpl #
struct SingleImpl {
mut:
iterable Iterable
parent context.Context
}
SingleImpl implements Single
fn (SingleImpl) str #
fn (o SingleImpl) str() string
fn (SingleImpl) get #
fn (mut o SingleImpl) get(opts ...RxOption) ?Item
get returns the item. The error returned is if the context has been cancelled.
This method is blocking.
fn (SingleImpl) filter #
fn (mut s SingleImpl) filter(apply Predicate, opts ...RxOption) OptionalSingle
filter amits only those items from an Observable that pass a predicate test
fn (SingleImpl) map #
fn (mut o SingleImpl) map(apply Func, opts ...RxOption) Single
map transforms the items emitted by an optional_single by applying a function to each item
fn (SingleImpl) run #
fn (mut o SingleImpl) run(opts ...RxOption) chan int
run creates an observer without consuming the emitted items
struct TimestampedItem #
struct TimestampedItem {
pub:
value ItemValue
timestamp time.Time
}
TimestampedItem is a wrapper around a value that can be used as a value in a reactive expression. It is a reference type, so it can be used to wrap values that are not reference types.
- fn assert_iterable
- fn assert_single
- fn create
- fn custom_predicate
- fn default_error_func_operator
- fn defer_obs
- fn empty
- fn from_channel
- fn from_error
- fn from_event_source
- fn has_an_error
- fn has_error
- fn has_errors
- fn has_item
- fn has_items
- fn has_items_no_order
- fn has_no_error
- fn is_empty
- fn is_not_empty
- fn just
- fn just_item
- fn new_assertion
- fn new_illegal_input_error
- fn new_index_out_of_bound_error
- fn of
- fn range
- fn send_items
- fn serialize
- fn thrown
- fn with_back_pressure_strategy
- fn with_buffered_channel
- fn with_context
- fn with_cpu_pool
- fn with_error_strategy
- fn with_observation_strategy
- fn with_pool
- fn with_publish_strategy
- interface Duration
- interface ItemValue
- interface Iterable
- interface Observable
- interface OptionalSingle
- interface RxOption
- interface Single
- type AssertApplyFn
- type AssertPredicate
- type ChannelIterable
- type Comparator
- type CompletedFunc
- type CreateIterable
- type DeferIterable
- type Disposable
- type DistributionFn
- type DistributionStrFn
- type ErrFunc
- type ErrorFunc
- type ErrorToObservable
- type FactoryFn
- type FactoryIterable
- type Func
- type Func2
- type FuncN
- type FuncOption
- type IdentifierFn
- type ItemToObservable
- type IterableFactoryFn
- type JustIterable
- type Marshaller
- type NextFunc
- type Predicate
- type Producer
- type RangeIterable
- type RetryFn
- type SliceIterable
- type Supplier
- type TimeExtractorFn
- type Unmarshaller
- enum BackpressureStrategy
- enum CloseChannelStrategy
- enum ObservationStrategy
- enum OnErrorStrategy
- struct IllegalInputError
- struct IndexOutOfBoundError
- struct Item
- struct ObservableImpl
- struct OperatorOptions
- struct OptionalSingleImpl
- struct RxAssert
- struct SingleImpl
- struct TimestampedItem