Skip to content

fn assert_iterable #

fn assert_iterable(mut ctx context.Context, mut iterable Iterable, assertions ...RxAssert)

assert_iterable asserts the result of an iterable against a list of assertions.

fn assert_single #

fn assert_single(mut ctx context.Context, mut iterable Single, assertions ...RxAssert)

assert_single asserts the result of an iterable against a list of assertions.

fn create #

fn create(f []Producer, opts ...RxOption) Observable

create creates an Observable from scratch by calling the passed function observer methods programatically.

fn custom_predicate #

fn custom_predicate(predicate AssertPredicate) RxAssert

custom_predicate checks a custom predicate.

fn default_error_func_operator #

fn default_error_func_operator(mut ctx context.Context, item Item, dst chan Item, operator_options OperatorOptions)

fn defer_obs #

fn defer_obs(f []Producer, opts ...RxOption) Observable

defer_obs does not create the Observable until the observer subscribes, and creates a fresh Observable for each observer.

fn empty #

fn empty() Observable

empty creates an Observable with no item and terminate immediately.

fn from_channel #

fn from_channel(next chan Item, opts ...RxOption) Observable

from_channel creates a cold observable from a channel.

fn from_error #

fn from_error(err IError) Item

from_error creates an item from an error

fn from_event_source #

fn from_event_source(next chan Item, opts ...RxOption) Observable

from_event_source creates a hot observable from an event source.

fn has_an_error #

fn has_an_error() RxAssert

has_an_error checks that the observable has produce an error.

fn has_error #

fn has_error(err IError) RxAssert

has_error checks that the observable has produce a specific error.

fn has_errors #

fn has_errors(errs ...IError) RxAssert

has_errors checks that the observable has produce a set of errors.

fn has_item #

fn has_item(i ItemValue) RxAssert

has_item checks if a single or optional single has a specific item.

fn has_items #

fn has_items(items ...ItemValue) RxAssert

has_items checks that the observable produces the corresponding items.

fn has_items_no_order #

fn has_items_no_order(items ...ItemValue) RxAssert

has_items_no_order checks that an observable produces the corresponding items regardless of the order.

fn has_no_error #

fn has_no_error() RxAssert

has_no_error checks that the observable has not raised any error.

fn is_empty #

fn is_empty() RxAssert

is_empty checks that the observable has not produce any item.

fn is_not_empty #

fn is_not_empty() RxAssert

is_not_empty checks that the observable produces some items.

fn just #

fn just(items ...ItemValue) fn (opts ...RxOption) Observable

just creates an Observable with the provided items.

fn just_item #

fn just_item(item ItemValue) fn (opts ...RxOption) Observable

just_item creates an Observable with the provided item.

fn new_assertion #

fn new_assertion(f AssertApplyFn) RxAssert

fn new_illegal_input_error #

fn new_illegal_input_error(msg string) IError

fn new_index_out_of_bound_error #

fn new_index_out_of_bound_error(msg string) IError

fn of #

fn of(value ItemValue) Item

of creates an item from a value

fn range #

fn range(start int, count int, opts ...RxOption) Observable

range creates an Observable that emits a range of sequential integers.

fn send_items #

fn send_items(mut ctx context.Context, ch chan Item, strategy CloseChannelStrategy, items []ItemValue)

send_items is a helper function to send items to a channel.

fn serialize #

fn serialize(identifier SerializedFn) RxOption

serialize forces an Observable to make serialized calls and to be well-behaved.

fn thrown #

fn thrown(err IError) Observable

thrown creates an Observable that emits no items and terminates with an error.

fn with_back_pressure_strategy #

fn with_back_pressure_strategy(strategy BackpressureStrategy) RxOption

with_back_pressure_strategy sets the back pressure strategy: drop or block.

fn with_buffered_channel #

fn with_buffered_channel(capacity int) RxOption

with_buffered_channel allows to configure the capacity of a buffered channel.

fn with_context #

fn with_context(mut ctx context.Context) RxOption

with_context allows to pass a context.

fn with_cpu_pool #

fn with_cpu_pool() RxOption

with_cpu_pool allows to specify an execution pool based on the number of logical CPUs.

fn with_error_strategy #

fn with_error_strategy(strategy OnErrorStrategy) RxOption

with_error_strategy defines how an observable should deal with_ error.
This strategy is propagated to the parent observable.

fn with_observation_strategy #

fn with_observation_strategy(strategy ObservationStrategy) RxOption

with_observation_strategy uses the eager observation mode meaning consuming the items even with_out subscription.

fn with_pool #

fn with_pool(pool int) RxOption

with_pool allows to specify an execution pool.

fn with_publish_strategy #

fn with_publish_strategy() RxOption

with_publish_strategy converts an ordinary Observable into a connectable Observable.

interface Duration #

interface Duration {
	duration() time.Duration
}

Duration represents a duration

interface ItemValue #

interface ItemValue {}

ItemValue is a type that can be used as a value in a reactive expression.

interface Iterable #

interface Iterable {
mut:
	observe(opts ...RxOption) chan Item
}

Iterable is the basic type that can be observed

interface Observable #

interface Observable {
	Iterable
mut:
	all(predicate Predicate, opts ...RxOption) Single
	average_f32(opts ...RxOption) Single
	average_f64(opts ...RxOption) Single
	average_int(opts ...RxOption) Single
	// average_i8(opts ...RxOption) Single
	average_i16(opts ...RxOption) Single
	average_i64(opts ...RxOption) Single
	// 	// back_off_retry(back_off_cfg backoff.BackOff, opts ...RxOption) Observable
	// 	buffer_with_count(count int, opts ...RxOption) Observable
	// 	buffer_with_time(timespan Duration, opts ...RxOption) Observable
	// 	buffer_with_time_or_count(timespan Duration, count int, opts ...RxOption) Observable
	// 	connect(mut ctx context.Context) (context.Context, context.CancelFn)
	// 	contains(equal Predicate, opts ...RxOption) Single
	// 	count(opts ...RxOption) Single
	// 	debounce(timespan Duration, opts ...RxOption) Observable
	// 	default_if_empty(default_value ItemValue, opts ...RxOption) Observable
	// 	distinct(apply Func, opts ...RxOption) Observable
	// 	distinct_until_changed(apply Func, opts ...RxOption) Observable
	// 	do_on_completed(completed_func CompletedFunc, opts ...RxOption) chan int
	// 	do_on_error(err_func ErrFunc, opts ...RxOption) chan int
	// 	do_on_next(next_func NextFunc, opts ...RxOption) chan int
	// 	element_at(index u32, opts ...RxOption) Single
	// 	error(opts ...RxOption) IError
	// 	errors(opts ...RxOption) []IError
	// 	filter(apply Predicate, opts ...RxOption) Observable
	// 	find(find Predicate, opts ...RxOption) OptionalSingle
	// 	first(opts ...RxOption) OptionalSingle
	// 	first_or_default(default_value ItemValue, opts ...RxOption) Single
	// 	flat_map(apply ItemToObservable, opts ...RxOption) Observable
	// 	for_each(next_func NextFunc, err_func ErrFunc, completed_func CompletedFunc, opts ...RxOption) chan int
	// 	group_by(length int, distribution DistributionFn, opts ...RxOption) Observable
	// 	group_by_dynamic(distribution DistributionStrFn, opts ...RxOption) Observable
	// 	ignore_elements(opts ...RxOption) Observable
	// 	join(joiner Func2, mut right Observable, time_extractor TimeExtractorFn, window Duration, opts ...RxOption) Observable
	// 	last(opts ...RxOption) OptionalSingle
	// 	last_or_default(default_value ItemValue, opts ...RxOption) Single
	// 	map(apply Func, opts ...RxOption) Observable
	// 	marshal(marshaller Marshaller, opts ...RxOption) Observable
	// 	max(comparator Comparator, opts ...RxOption) OptionalSingle
	// 	min(comparator Comparator, opts ...RxOption) OptionalSingle
	// 	on_error_resume_next(resume_sequence ErrorToObservable, opts ...RxOption) Observable
	// 	on_error_return(resume_func ErrorFunc, opts ...RxOption) Observable
	// 	on_error_return_item(resume ItemValue, opts ...RxOption) Observable
	// 	reduce(apply Func2, opts ...RxOption) OptionalSingle
	// 	repeat(count i64, frequency Duration, opts ...RxOption) Observable
	// 	retry(count int, should_retry RetryFn, opts ...RxOption) Observable
	// 	run(opts ...RxOption) chan int
	// 	sample(mut iterable Iterable, opts ...RxOption) Observable
	// 	scan(apply Func2, opts ...RxOption) Observable
	// 	sequence_equal(mut iterable Iterable, opts ...RxOption) Single
	// 	send(output chan Item, opts ...RxOption)
	// 	serialize(from context.Context, identifier IdentifierFn, opts ...RxOption) Observable
	// 	skip(nth u32, opts ...RxOption) Observable
	// 	skip_last(nth u32, opts ...RxOption) Observable
	// 	skip_while(apply Predicate, opts ...RxOption) Observable
	// 	start_with(mut iterable Iterable, opts ...RxOption) Observable
	// 	sum_f32(opts ...RxOption) OptionalSingle
	// 	sum_f64(opts ...RxOption) OptionalSingle
	// 	sum_i64(opts ...RxOption) OptionalSingle
	// 	take(nth u32, opts ...RxOption) Observable
	// 	take_last(nth u32, opts ...RxOption) Observable
	// 	take_until(apply Predicate, opts ...RxOption) Observable
	// 	take_while(apply Predicate, opts ...RxOption) Observable
	// 	time_interval(opts ...RxOption) Observable
	// 	timestamp(opts ...RxOption) Observable
	// 	to_map(key_selector Func, opts ...RxOption) Single
	// 	to_map_with_value_selector(key_selector Func, valueSelector Func, opts ...RxOption) Single
	// 	to_slice(initial_capacity int, opts ...RxOption) ?[]ItemValue
	// 	unmarshal(unmarshaller Unmarshaller, factory FactoryFn, opts ...RxOption) Observable
	// 	window_with_count(count int, opts ...RxOption) Observable
	// 	window_with_time(timespan Duration, opts ...RxOption) Observable
	// 	window_with_time_or_count(timespan Duration, count int, opts ...RxOption) Observable
	// 	zip_from_iterable(mut iterable Iterable, zipper Func2, opts ...RxOption) Observable
}

Observable is the standard interface for Observables.

interface OptionalSingle #

interface OptionalSingle {
	Iterable
mut:
	get(opts ...RxOption) ?Item
	map(apply Func, opts ...RxOption) Single
	run(opts ...RxOption) chan int
}

OptionalSingle is an optional single

interface RxOption #

interface RxOption {
	apply(mut fdo FuncOption)
	to_propagate() bool
	is_eager_observation() bool
	get_pool() ?int
	build_channel() chan Item
	build_context(parent context.Context) context.Context
	get_back_pressure_strategy() BackpressureStrategy
	get_error_strategy() OnErrorStrategy
	is_connectable() bool
	is_connect_operation() bool
	is_serialized() ?SerializedFn
}

Options handles configurable options

interface Single #

interface Single {
	Iterable
mut:
	observe(opts ...RxOption) chan Item
	filter(apply Predicate, opts ...RxOption) OptionalSingle
	get(opts ...RxOption) ?Item
	map(apply Func, opts ...RxOption) Single
	run(opts ...RxOption) chan int
}

Single is an observable with a single element

type AssertApplyFn #

type AssertApplyFn = fn (mut do RxAssert)

AssertApplyFn is a custom function to apply modifications to a RxAssert.

type AssertPredicate #

type AssertPredicate = fn (items []ItemValue) ?

AssertPredicate is a custom predicate based on the items.

fn (ChannelIterable) observe #

fn (mut i ChannelIterable) observe(opts ...RxOption) chan Item

type Comparator #

type Comparator = fn (a ItemValue, b ItemValue) int

Comparator defines a func that returns an int:

  • 0 if two elements are equals
  • A negative value if the first argument is less than the second
  • A positive value if the first argument is greater than the second

type CompletedFunc #

type CompletedFunc = fn ()

CompletedFunc handles the end of a stream.

fn (CreateIterable) observe #

fn (mut i CreateIterable) observe(opts ...RxOption) chan Item

fn (DeferIterable) observe #

fn (i &DeferIterable) observe(opts ...RxOption) chan Item

type Disposable #

type Disposable = fn ()

Disposable is a function to be called in order to dispose a subscription.
pub type Disposable = context.CancelFn

type DistributionFn #

type DistributionFn = fn (item Item) int

type DistributionStrFn #

type DistributionStrFn = fn (item Item) string

type ErrFunc #

type ErrFunc = fn (err IError)

ErrFunc handles an string in a stream.

type ErrorFunc #

type ErrorFunc = fn (err IError) ItemValue

ErrorFunc defines a function that computes a value from an string.

type ErrorToObservable #

type ErrorToObservable = fn (err IError) Observable

ErrorToObservable defines a function that transforms an observable from an error.

type FactoryFn #

type FactoryFn = fn (opts ...RxOption) chan Item

fn (FactoryIterable) observe #

fn (i &FactoryIterable) observe(opts ...RxOption) chan Item

type Func #

type Func = fn (mut ctx context.Context, arg ItemValue) ?ItemValue

Func defines a function that computes a value from an input value.

type Func2 #

type Func2 = fn (mut ctx context.Context, a ItemValue, b ItemValue) ?ItemValue

Func2 defines a function that computes a value from two input values.

type FuncN #

type FuncN = fn (args ...ItemValue) ItemValue

FuncN defines a function that computes a value from N input values.

fn (FuncOption) str #

fn (o FuncOption) str() string

type IdentifierFn #

type IdentifierFn = fn (value ItemValue) int

type ItemToObservable #

type ItemToObservable = fn (item Item) Observable

ItemToObservable defines a function that computes an observable from an item.

type IterableFactoryFn #

type IterableFactoryFn = fn (mut ctx context.Context, next chan Item, option RxOption, opts ...RxOption)

fn (JustIterable) observe #

fn (i &JustIterable) observe(opts ...RxOption) chan Item

type Marshaller #

type Marshaller = fn (value ItemValue) ?[]byte

Marshaller defines a marshaller type (ItemValue to []byte).

type NextFunc #

type NextFunc = fn (arg ItemValue)

NextFunc handles a next item in a stream.

type Predicate #

type Predicate = fn (value ItemValue) bool

Predicate defines a func that returns a bool from an input value.

type Producer #

type Producer = fn (mut ctx context.Context, next chan Item)

Producer defines a producer implementation.

fn (RangeIterable) observe #

fn (i &RangeIterable) observe(opts ...RxOption) chan Item

type RetryFn #

type RetryFn = fn (err IError) bool

fn (SliceIterable) observe #

fn (i &SliceIterable) observe(opts ...RxOption) chan Item

type Supplier #

type Supplier = fn (mut ctx context.Context) Item

Supplier defines a function that supplies a result from nothing.

type TimeExtractorFn #

type TimeExtractorFn = fn (value ItemValue) time.Time

type Unmarshaller #

type Unmarshaller = fn ([]byte, ItemValue) ItemValue

Unmarshaller defines an unmarshaller type ([]byte to interface).

enum BackpressureStrategy #

enum BackpressureStrategy {
	// block blocks until the channel is available.
	block
	// drop drops the message.
	drop
}

BackpressureStrategy is the backpressure strategy type.

enum CloseChannelStrategy #

enum CloseChannelStrategy {
	// leave_channel_open indicates to leave the channel open after completion.
	leave_channel_open
	// close_channel indicates to close the channel open after completion.
	close_channel
}

CloseChannelStrategy indicates a strategy on whether to close a channel.

enum ObservationStrategy #

enum ObservationStrategy {
	// lazy is the default observation strategy, when an Observer subscribes.
	lazy
	// eager means consuming as soon as the Observable is created.
	eager
}

ObservationStrategy defines the strategy to consume from an Observable.

enum OnErrorStrategy #

enum OnErrorStrategy {
	// stop_on_error is the default error strategy.
	// An operator will stop processing items on error.
	stop_on_error
	// continue_on_error means an operator will continue processing items after an error.
	continue_on_error
}

OnErrorStrategy is the Observable error strategy.

struct IllegalInputError #

struct IllegalInputError {
pub:
	msg  string
	code int
}

IllegalInputError is triggered when the observable receives an illegal input

fn (IllegalInputError) msg #

fn (e IllegalInputError) msg() string

msg returns the error message

fn (IllegalInputError) code #

fn (e IllegalInputError) code() int

code returns the error code

struct IndexOutOfBoundError #

struct IndexOutOfBoundError {
pub:
	msg  string
	code int
}

IndexOutOfBoundError is triggered when the observable cannot access to the specified index

fn (IndexOutOfBoundError) msg #

fn (e IndexOutOfBoundError) msg() string

msg returns the error message

fn (IndexOutOfBoundError) code #

fn (e IndexOutOfBoundError) code() int

code returns the error code

struct Item #

struct Item {
pub:
	value ItemValue|none = none
	err   IError|none    = none
}

Item is a wrapper around a value that can be used as a value in a reactive expression. It is a reference type, so it can be used to wrap values that are not reference types.

fn (Item) is_error #

fn (i Item) is_error() bool

is_error checks if an item is an error

fn (Item) send_blocking #

fn (i Item) send_blocking(ch chan Item)

send_blocking sends an item and blocks until it is sent

fn (Item) send_context #

fn (i Item) send_context(mut ctx context.Context, ch chan Item) bool

send_context sends an item and blocks until it is sent or a context canceled.
It returns a boolean to indicate wheter the item was sent.

fn (Item) send_non_blocking #

fn (i Item) send_non_blocking(ch chan Item) bool

send_non_blocking sends an item without blocking.
It returns a boolean to indicate whether the item was sent.

struct ObservableImpl #

struct ObservableImpl {
mut:
	iterable Iterable
	parent   context.Context = context.background()
}

ObservableImpl implements Observable.

fn (ObservableImpl) all #

fn (mut o ObservableImpl) all(predicate Predicate, opts ...RxOption) Single

all determines whether all items emitted by an Observable meet some criteria

fn (ObservableImpl) average_f32 #

fn (mut o ObservableImpl) average_f32(opts ...RxOption) Single

average_f32 calculates the average of numbers emitted by an Observable and emits the average f32

fn (ObservableImpl) average_f64 #

fn (mut o ObservableImpl) average_f64(opts ...RxOption) Single

average_f64 calculates the average of numbers emitted by an Observable and emits the average f64

fn (ObservableImpl) average_i16 #

fn (mut o ObservableImpl) average_i16(opts ...RxOption) Single

average_i16 calculates the average of numbers emitted by an Observable and emits the average i16

fn (ObservableImpl) average_i64 #

fn (mut o ObservableImpl) average_i64(opts ...RxOption) Single

average_i64 calculates the average of numbers emitted by an Observable and emits the average i64

fn (ObservableImpl) average_int #

fn (mut o ObservableImpl) average_int(opts ...RxOption) Single

average_int calculates the average of numbers emitted by an Observable and emits the average int

fn (ObservableImpl) observe #

fn (mut o ObservableImpl) observe(opts ...RxOption) chan Item

Observe observes an Observable by returning its channel.

fn (ObservableImpl) str #

fn (o ObservableImpl) str() string

str returns a string representation of the Observable.

struct OperatorOptions #

struct OperatorOptions {
	stop           fn ()
	reset_iterable fn (mut Iterable)
}

struct OptionalSingleImpl #

struct OptionalSingleImpl {
mut:
	iterable Iterable
	parent   context.Context
}

OptionalSingleImpl is the default implementation for OptionalSingle

fn (OptionalSingleImpl) str #

fn (o OptionalSingleImpl) str() string

fn (OptionalSingleImpl) get #

fn (mut o OptionalSingleImpl) get(opts ...RxOption) ?Item

get returns the item or rxv.optional_empty. The error returned is if the context has been canceled.
this method is blocking.

fn (OptionalSingleImpl) map #

fn (mut o OptionalSingleImpl) map(apply Func, opts ...RxOption) Single

map transforms the items emitted by an optional_single by applying a function to each item

fn (OptionalSingleImpl) run #

fn (mut o OptionalSingleImpl) run(opts ...RxOption) chan int

run creates an observer without consuming the emitted items

struct RxAssert #

struct RxAssert {
	f AssertApplyFn = unsafe { nil }
mut:
	check_has_items            bool
	check_has_no_items         bool
	check_has_some_items       bool
	items                      []ItemValue
	check_has_items_no_order   bool
	items_no_order             []ItemValue
	check_has_raised_error     bool
	check_has_raised_errors    bool
	errs                       []IError
	check_has_raised_an_error  bool
	check_has_not_raised_error bool
	check_has_item             bool
	item                       ItemValue
	check_has_no_item          bool
	check_has_custom_predicate bool
	custom_predicates          []AssertPredicate
}

RxAssert is a structure to apply assertions to an observable.

struct SingleImpl #

struct SingleImpl {
mut:
	iterable Iterable
	parent   context.Context
}

SingleImpl implements Single

fn (SingleImpl) str #

fn (o SingleImpl) str() string

fn (SingleImpl) get #

fn (mut o SingleImpl) get(opts ...RxOption) ?Item

get returns the item. The error returned is if the context has been cancelled.
This method is blocking.

fn (SingleImpl) filter #

fn (mut s SingleImpl) filter(apply Predicate, opts ...RxOption) OptionalSingle

filter amits only those items from an Observable that pass a predicate test

fn (SingleImpl) map #

fn (mut o SingleImpl) map(apply Func, opts ...RxOption) Single

map transforms the items emitted by an optional_single by applying a function to each item

fn (SingleImpl) run #

fn (mut o SingleImpl) run(opts ...RxOption) chan int

run creates an observer without consuming the emitted items

struct TimestampedItem #

struct TimestampedItem {
pub:
	value     ItemValue
	timestamp time.Time
}

TimestampedItem is a wrapper around a value that can be used as a value in a reactive expression. It is a reference type, so it can be used to wrap values that are not reference types.