RedisGears Operations

An operation is the building block of RedisGears functions . Different operation types can be used to achieve a variety of results to meet various data processing needs.

Operations can have zero or more arguments that control their operation. Depending on the operation's type arguments may be language-native data types and function callbacks.

The following sections describe the different operations.

Operation Description Type
Map Maps 1:1 Local
FlatMap Maps 1:N Local
ForEach Does something for each record Local
Filter Filters records Local
Accumulate Maps N:1 Local
LocalGroupBy Groups records by key Local
Limit Limits the number of records Local
Collect Shuffles all records to one engine Global
Repartition Shuffles records between all engines Global
GroupBy Groups records by key Sugar
BatchGroupBy Groups records by key Sugar
Sort Sorts records Sugar
Distinct Makes distinct records Sugar
Aggregate Aggregates records Sugar
AggregateBy Aggregates records by key Sugar
Count Counts records Sugar
CountBy Counts records by key Sugar
Avg Computes the average Sugar

Map

The local Map operation performs the one-to-one (1:1) mapping of records.

It requires one mapper callback.

Common uses

  • Transform the data's shape
  • Typecasting and value formatting
  • Splitting, joining and similar string manipulations
  • Removing and/or adding data from/to the record

Python API

class GearsBuilder.map(f)

Arguments

Examples

# Will stringify the internal record representation
gb = GB()
gb.map(lambda x: str(x))
gb.run()

FlatMap

The local FlatMap operation performs one-to-many (1:N) mapping of records.

It requires one expander callback that maps a single input record to one or more output records.

FlatMap is nearly identical to the Map operation in purpose and use. Unlike regular mapping, however, when FlatMap returns a list, each element in the list is turned into a separate output record.

Common uses

  • Deconstruction of nested, multi-part, or otherwise overly-complicated records

Python API

class GearsBuilder.flatmap(f)

Arguments

Examples

# Split KeyReader's record into two: one for key the other for value
gb = GB()
gb.flatmap(lambda x: [x['key'], x['value']])
gb.run()

ForEach

The local ForEach operation performs one-to-the-same (1=1) mapping.

It requires one processor callback to perform some work that's related to the input record.

Its output record is a copy of the input, which means anything the callback returns is discarded.

Common uses

  • Non-transforming, record-related logic

Python API

class GearsBuilder.foreach(f)

Arguments

  • f : the processor function callback that will be used on each record

Examples

# Increment a shard-local counter for each record processed
gb = GB()
gb.foreach(lambda x: execute('INCR', hashtag()))
gb.run()

Filter

The local Filter operation performs one-to-zero-or-one (1:(0|1)) filtering of records.

It requires a filterer function callback.

An input record that yields a falsehood will be discarded and only truthful ones will be output.

Common uses

  • Filtering records

Python API

class GearsBuilder.filter(f)

Arguments

  • f : the filtering function callback

Examples

# Filter out records where the key's length is less than four characters
gb = GB()
gb.filter(lambda x: len(x['key']) > 3)
gb.run()

Accumulate

The local Accumulate operation performs many-to-one mapping (N:1) of records.

It requires one accumulator callback.

Once input records are exhausted its output is a single record consisting of the accumulator's value.

Common uses

  • Aggregating records

Python API

class GearsBuilder.accumulate(f)

Arguments

Examples

# Sum the lengths of all records' keys
gb = GB()
gb.accumulate(lambda a, r: (a if a else 0) + len(r['key']))
gb.run()

LocalGroupBy

The local LocalGroupBy operation performs many-to-less mapping (N:M) of records.

The operation requires two callbacks: an extractor a reducer .

The output records consist of the grouping key and its respective accumulator's value.

Common uses

  • Grouping records by key

Python API

class GearsBuilder.localgroupby(e, r)

Arguments

Examples

# Locally group and count records by the first byte in their key
gb = GB()
gb.localgroupby(lambda x: x['key'][:1],
                lambda k, a, r: (a if a else 0) + 1)
gb.run()

Limit

The local Limit operation limits the number of records.

It accepts two numeric arguments: a starting position in the input records "array" and a maximal number of output records.

Common uses

  • Returning the first results
  • Batch paging on static data

Python API

class GearsBuilder.limit(length, start=0)

Arguments

  • length : the maximal length of the output records list
  • start : a 0-based index of the input record to start from

Examples

# Limit the output to the three records starting with the second record
gb = GB()
gb.limit(3, 1)
gb.run()

Collect

The global Collect operation collects the result records from all of the shards to the originating one.

It has no arguments.

Common uses

  • Final steps of distributed executions

Python API

class GearsBuilder.collect()

Examples

# Shuffles all records to the originating shard
gb = GB()
gb.collect()
gb.run()

Repartition

The global Repartition operation repartitions the records by them shuffling between shards.

It accepts a single key extractor function callback. The extracted key is used for computing the record's new placement in the cluster (i.e. hash slot). The operation then moves the record from its original shard to the new one.

Common uses

  • Remapping of records to engines
  • JOIN-like operations

Python API

class GearsBuilder.repartition(f)

Arguments

Examples

# Will not repartition anything because the record's key is returned as-is
gb = GB()
gb.repartition(lambda x: x['key'])
gb.run()

# Will shuffle records by hashing the first byte in their key
gb = GB()
gb.repartition(lambda x: x['key'][:1])
gb.run()

Aggregate

The sugar Aggregate operation performs many-to-one mapping (N:1) of records.

Aggregate provides an alternative to the local accumulate operation as it takes the partitioning of data into consideration. Furthermore, because records are aggregated locally before collection, its performance is usually superior.

It requires a zero value and two accumulator callbacks for computing the local and global aggregates.

The operation is made of these steps:

  1. The local accumulator is executed locally and initialized with the zero value
  2. A global collect moves all records to the originating engine
  3. The global accumulator is executed locally by the originating engine

Its output is a single record consisting of the accumulator's global value.

Python API

class GearsBuilder.aggregate(z, l, g)

Arguments

Examples

# Will put all values in a single Python list
gb = GB()
gb.aggregate([],
             lambda a, r: a + [r['value']],
             lambda a, r: a + r)
gb.run()

AggregateBy

The sugar AggregateBy operation performs many-to-less mapping (N:M) of records.

It is similar to the Aggregate operation but aggregates per key. It requires a an extractor callback, a zero value and two reducers callbacks for computing the local and global aggregates.

The operation is made of these steps:

  1. extraction of the groups using extractor
  2. The local reducer is executed locally and initialized with the zero value
  3. A global repartition operation that uses the extractor
  4. The global reducer is executed on each shard once it is repartitioned with its relevant keys

Output list of records, one for each key. The output records consist of the grouping key and its respective reducer's value.

Python API

class GearsBuilder.aggregateby(e, z, l, g)

Arguments

  • e : a key extractor function callback
  • z : the aggregate's zero value
  • l : a local reducer function callback
  • g : a global reducer function callback

Examples

# Will put all records of each value in a different list
gb = GB()
gb.aggregateby(lambda x: x['value'],
               [],
               lambda k, a, r: a + [r],
               lambda k, a, r: a + x)
gb.run()

GroupBy

The sugar GroupBy * operation performs a many-to-less (N:M) grouping of records. It is similar to AggregateBy but uses only a global reducer. It can be used in cases where locally reducing the data isn't possible.

The operation requires two callbacks: an extractor a reducer .

The operation is made of these steps:

  1. A global repartition operation that uses the extractor
  2. The reducer is locally invoked

Output is a locally-reduced list of records, one for each key. The output records consist of the grouping key and its respective accumulator's value.

Python API

class GearsBuilder.groupby(e, r)

Arguments

Examples

# Group and count records by the first byte in their key
gb = GB()
gb.groupby(lambda x: x['key'][:1],
           lambda k, a, r: (a if a else 0) + 1)
gb.run()

BatchGroupBy

The sugar BatchGroupBy operation performs a many-to-less (N:M) grouping of records.

Prefer the GroupBy Operation

Instead of using BatchGroupBy, prefer using the GroupBy operation as it is more efficient and performant. Only use BatchGroupBy when the reducer's logic requires the full list of records for each input key.

The operation requires two callbacks: an extractor a batch reducer .

The operation is made of these steps:

  1. A global repartition operation that uses the extractor
  2. A local localgroupby operation that uses the batch reducer

Once finished, the operation locally outputs a record for each key and its respective accumulator value.

Increased memory consumption

Using this operation may cause a substantial increase in memory usage during runtime.

Python API

class GearsBuilder.batchgroupby(e, r)

Arguments

Examples

# Group and count records by the first byte in their key
gb = GB()
gb.batchgroupby(lambda x: x['key'][:1],
                lambda k, l: len(l))
gb.run()

Sort

The sugar Sort operation sorts the records.

It accepts a single Boolean argument that determines the order.

The operation is made of the following steps:

  1. A global aggregate operation collects and combines all records
  2. A local sort is performed on the list
  3. The list is flatmapped to records

Increased memory consumption

Using this operation may cause an increase in memory usage during runtime due to the list being copied during the sorting operation.

Python API

class GearsBuilder.sort(reverse=True)

Arguments

  • reverse : when True sorts in descending order

Examples

# Sorts the records in ascending order
gb = GB()
gb.sort(reverse=False)
gb.run()

Distinct

The sugar Distinct operation returns distinct records.

It requires no arguments.

The operation is made of the following steps:

  1. A aggregate operation locally reduces the records to sets that are then collected and unionized globally
  2. A local flatmap operation turns the set into records

Python API

class GearsBuilder.distinct()

Examples

# Makes every record distinct
gb = GB()
gb.distinct()
gb.run()

Count

The sugar Count operation counts the records.

It requires no arguments.

The operation is made of an aggregate operation that uses local counting and global summing accumulators.

Python API

class GearsBuilder.count()

Examples

# Counts the records
gb = GB()
gb.count()
gb.run()

CountBy

The sugar CountBy operation counts the records grouped by key.

It requires a single extractor function callback.

The operation is made of an aggregateby operation that uses local counting and global summing accumulators.

Python API

class GearsBuilder.countby(extractor=lambda x: x)

Arguments

  • extractor : an optional key extractor function callback

Examples

# Counts the number of time each value is stored
gb = GB()
gb.countby(lambda x: x['value'])
gb.run()

Avg

The sugar Avg operation returns the arithmetic average of records.

It accepts an optional value extractor function callback.

The operation is made of the following steps:

  1. A aggregate operation locally reduces the records to tuples of sum and count that are globally combined.
  2. A local map operation calculates the average from the global tuple

Python API

class GearsBuilder.avg(extractor=lambda x: float(x))

Arguments

  • extractor : an optional value extractor function callback

Examples

# Computes the average from all records' values
gb = GB()
gb.avg()
gb.run()

Terminology

Local

The Local execution of an operation is carried out the RedisGears engine that's deployed in either stand-alone or cluster mode. When used alone, there's a single engine executing all operations on all data locally.

When clustered, the operation is distributed to all shards. Each shard's engine executes the operation locally as well. Shards' engines, however, can only process the data they are partitioned with by the cluster.

Global

Global operations are only relevant in the context of a clustered RedisGears environment. These are the Collect and Repartition operations that shuffle records between shards.

Sugar

A Sugar operation is a utility operation. These are implemented internally with basic operations and the relevant callbacks.

Callback

A Callback is used for calling a function in the language used by the API.

Extractor

An Extractor is a callback that receives an input record as an argument. It returns a value extracted from the record. The returned value should be a native string.

Python

# Lambda function form
lambda r: str(...)

# Function form
def extractorFunction(r):
  ...
  return ...

Arguments

  • r : the input record

Examples

# These extractors expect dict() records having a 'key' key (e.g. KeysReader)
def keyExtractor(r):
  ''' Just extracts the key '''
  return str(r['key'])

def reverseExtractor(r):
  ''' Reverses the extracted key '''
  return str(r['key'])[::-1]

# This extractor expects dict() records having a string 'value' key (e.g. KeysReader with Redis Strings)
def floatExtractor(r):
  ''' Makes the value float '''
  return float(r['value'])

Mapper

A Mapper is a callback that receives an input record as an argument. It must return an output record.

Python

# Lambda function form
lambda r: ...

# Function form
def mapperrFunction(r):
  ...
  return o

Arguments

  • r : the input record

Return

  • o : an output record

Examples

# This mapper expects dict() records having a 'key' key (e.g. KeysReader)
def keyOnlyMapper(r):
  ''' Maps a record to its key only '''
  return str(r['key'])

Expander

An Expander is a callback that receives an input record. It must return one or one or more output records.

Python

# Lambda function form
lambda r: list(...)

# Function form
def expanderFunction(r):
  ...
  return list(i)

Arguments

  • r : the input record

Return

  • i : an iterable of output records

Examples

# This expander expects KeysReader records of Redis Hashes
def hashExploder(r):
  ''' Splats a record's dict() 'value' into its keys '''
  # Prefix each exploded key with the original in curly brackets and a colon
  # for clustering safety, i.e.: {hashkeyname}:fieldname
  pre = '{' + r['key'] + '}:'
  l = [{ 'key': f'{pre}{x[0]}', 'value': x[1] } for x in r['value'].items()]
  return l

Processor

A Processor is a callback that receives an input record. It shouldn't return anything.

Python

# Lambda function form
lambda r: ...

# Function form
def processorFunction(r):
  ...

Arguments

  • r : the input record

Examples

def logProcessor(r):
  ''' Log each record '''
  log(str(r))

Filterer

A Filterer is a callback that receives an input record. It must return a Boolean value.

Python

# Lambda function form
lambda r: bool(...)

# Function form
def filtererFunction(r):
  ...
  return bool(b)

Arguments

  • r : the input record

Return

  • b : a Boolean value

Examples

def dictRecordFilter(r):
  ''' Filters out non-dict records (e.g. Redis' Strings won't pass) '''
  return type(r) is dict

Accumulator

An Accumulator is a callback that receives an input record and variable that's also called an accumulator. It aggregates inputs into the accumulator variable, which stores the state between the function's invocations. The function must return the accumulator's updated value after each call.

Python

# Lambda function form
lambda a, r: ...

# Function form
def accumulatorFunction(a, r):
  ...
  return u

Arguments

  • a : the accumulator's value from previous calls
  • r : the input record

Return

  • u : the accumulator's updated value

Examples

# This accumulator expects nothing
def countingAccumulator(a, r):
  ''' Counts records '''
  # a's initial value is None so set it to a zero value if so
  a = a if a else 0
  # increment it by one
  a = a + 1
  return a

Reducer

A Reducer is a callback function that receives a key, an input and a variable that's called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key.

Python

# Lambda function form
lambda k, a, r: ...

# Function form
def reducerFunction(k, a, r):
  ...
  return u

Arguments

  • k : the key
  • a : the accumulator's value from previous calls
  • r : the input record

Return

  • u : the accumulator's updated value

Examples

def keyCountingReducer(k, a, r):
  ''' Counts records for each key'''
  # a's initial value is None so set it to a zero value if so
  a = a if a else 0
  # increment it by one
  a = a + 1
  return a

Batch Reducer

A Batch Reducer is a callback function that receives a key and a list of input records. It performs similarly to the reducer callback, with the difference being that it is input with a list of records instead of a single one. It is expected to return an accumulator value for these records.

Python

# Lambda function form
lambda k, l: ...

# Function form
def batchReducerFunction(k, l):
  ...
  return a

Arguments

  • k : the key
  • l : the list of input record

Return

  • a : the accumulator's value

Examples

def batchKeyCountingReducer(k, l):
  ''' Counts records for each key'''
  a = len(l)
  return a