Map-Reduce

Fabric Engine では一般的な map-reduce フレームワークを提供し、それにより大規模なデータ集合に対する再帰的並列操作を作成することができます。

この章では、map-reduce の背景概念や Fabric Engine map-reduceフレームワークの使用法については説明しません。ここでは純粋に、KLでこのフレームワークに提供される型,関数を列挙します。map-reduceの概念や、フレームワークの使用法についてのさらなる情報は Map-Reduce Programming Guide を参照してください。

Map-Reduce 型

map-reduce のサポートのため、KLでは新しい2つの派生型を導入します。 ValueProducer<ValueType>ArrayProducer<ElementType> です。

ValueProducer<ValueType>

ValueProducer<ValueType> に既存の型 ValueType を与え、map-reduce値プロデューサを生成します。 ValueType 型の値を生成します。 ValueProducer<ValueType> 型の値は以下の特徴を持ちます:

  • ValueType と正確に一致する同じ型の変数に代入できます。換言すると ValueProducer<ValueType1>ValueProducer<ValueType2> が同じ型となるのは、 ValueType1ValueType2 が同じ型の場合のみです。
  • 以下のメソッドをサポートします
    function ValueType ValueProducer<ValueType>.produce()
    

    これは値プロデューサ(value producer)の値を生成します。

  • 以下のメソッドをサポートします
    function ValueProducer<ValueType>.flush()
    

    これは値プロデューサに接続されている全てのキャッシュを再帰的に破棄(フラッシュ)します。

ArrayProducer<ElementType>

ArrayProducer<ElementType> に既存の ElementType 型を与え、 map-reduce 配列プロデューサを生成します。 ElementType 型の値を生成します。 ArrayProducer<ElementType> 型の値は以下の特徴を持ちます:

  • ElementType と正確に一致する同じ型の変数に代入できます。換言すると ValueProducer<ElementType1>ValueProducer<ElementType2> が同じ型となるのは、 ElementType1ElementType2 が同じ型の場合のみです。
  • 以下のメソッドをサポートします
    function Size ArrayProducer<ElementType>.getCount()
    

    これは配列プロデューサ内の要素数を返します。 produce(i)getCount() の結果よりも超える i で呼び出しを行うと例外を投げます。

  • 以下のメソッドをサポートします
    function ElementType ArrayProducer<ElementType>.produce(Index i)
    

    これは配列プロデューサの i 番目の要素を生成します。

  • 以下のメソッドをサポートします
    function ArrayProducer<ElementType>.flush()
    

    これは、配列プロデューサに接続されている全てのキャッシュを破棄します。

Map-Reduce 関数

KLは幾つかの関数をサポートし、それにより既存のプロデューサ,KL関数,値から、新規の値(もしくは配列)プロデューサの作成ができます。

値プロデューサを作成する関数

createConstValue(expr)

定数を返す値プロデューサを作成します。

引数:
  • expr (ValueType) – 定数を得るため評価する式
戻り値の型:

ValueProducer<ValueType>

用例:

operator entry() {
  ValueProducer<Integer> vp = createConstValue( 42 );
  report(vp);
  report(vp.produce());
}

出力:

ValueProducer<Integer>
42
createValueGenerator(operatorName[, sharedValueProducer])

値を生成するため operatorName と名づけられたオペレータを呼ぶ、値プロデューサを作成します。

引数:
  • operatorName – 値を生成するため呼ぶオペレータの名前
  • sharedValueProducer (ValueProducer<SharedValueType>) – (オプション) operatorName へ渡される共有される値プロデューサ
戻り値の型:

ValueProducer<ValueType>

オペレータ operatorName は以下のプロトタイプに従う必要があります:

operator operatorName(
  io ValueType outputValue,
  // required if and only if sharedValueProducer provided
  in SharedValueType sharedValue
  );

ValueType 型は operatorName 第一パラメータの型から推測されます。

用例:

operator gen1(io String output) {
  output = "Hello!";
}

operator gen2(io String output, String shared) {
  output = "Hello, " + shared;
}

operator entry() {
  ValueProducer<String> vp1 = createValueGenerator(gen1);
  report("vp1 = " + vp1);
  report("vp1.produce() = " + vp1.produce());

  ValueProducer<String> vp2 = createValueGenerator(gen2, vp1);
  report("vp2 = " + vp2);
  report("vp2.produce() = " + vp2.produce());
}

出力:

vp1 = ValueProducer<String>
vp1.produce() = Hello!
vp2 = ValueProducer<String>
vp2.produce() = Hello, Hello!
createValueTransform(inputValueProducer, operatorName[, sharedValueProducer])

与えられたオペレータを呼び出し、結果を同型の異なる値へトランスフォームする値プロデューサを作成します。

引数:
  • inputValueProducer (ValueProducer<ValueType>) – トランスフォームさせるために入力する値プロデューサ
  • operatorName – 値をトランスフォームさせるために呼び出すオペレータの名前
  • sharedValueProducer (ValueProducer<SharedValueType>) – (オプション) operatorName へ渡される共有される値プロデューサ
戻り値の型:

ValueProducer<ValueType>

オペレータ operatorName は以下のプロトタイプに従う必要があります:

operator operatorName(
  io ValueType value,
  // required if and only if sharedValueProducer provided
  in SharedValueType sharedValue
  );

用例:

operator multByTwo(
  io Integer value
  )
{
  value *= 2;
}

operator multByShared(
  io Integer value,
  Integer shared
  )
{
  value *= shared;
}

operator entry() {
  ValueProducer<Integer> vp;

  vp = createValueTransform(
    createConstValue( 42 ),
    multByTwo
    );
  report(vp);
  report(vp.produce());

  vp = createValueTransform(
    createConstValue( 2 ),
    multByShared,
    createConstValue( 3 )
    );
  report(vp);
  report(vp.produce());
}

出力:

ValueProducer<Integer>
84
ValueProducer<Integer>
6
createValueMap(inputValueProducer, operatorName[, sharedValueType])

オペレータ operatorName を呼び出し、結果を異なる値プロデューサへと ―場合によっては異なる型へとマップする値プロデューサを作成します。

引数:
  • inputValueProducer (ValueProducer<InputValueType>) – 値のマップ元となる入力値プロデューサ
  • operatorName – 値のマップのために呼び出すオペレータの名前
  • sharedValueProducer (ValueProducer<SharedValueType>) – (オプション) operatorName へ渡される共有される値プロデューサ
戻り値の型:

ValueProducer<OutputValueType>

オペレータ operatorName は以下のプロトタイプに従う必要があります:

operator operatorName(
  in InputValueType inputValue,
  io OutputValueType outputValue,
  // required if and only if sharedValueProducer provided
  in SharedValueType sharedValue
  );

OutputValueType の型は operatorName の第二パラメータから推測されます

用例:

operator multByPi(
  Integer input,
  io Scalar output
  )
{
  output = 3.14 * input;
}

operator multByShared(
  Integer input,
  io Scalar output,
  Scalar shared
  )
{
  output = input * shared;
}

operator entry() {
  ValueProducer<Scalar> vp;

  vp = createValueMap(
    createConstValue( 42 ),
    multByPi
    );
  report(vp);
  report(vp.produce());

  vp = createValueMap(
    createConstValue( 2 ),
    multByShared,
    createConstValue( Scalar(2.71) )
    );
  report(vp);
  report(vp.produce());
}

出力:

ValueProducer<Scalar>
131.88
ValueProducer<Scalar>
5.42
createValueCache(inputValueProducer)

他の同型の値プロデューサの結果をキャッシュする、値プロデューサを作成します。詳細は Map-Reduce Programming Guide を参照してください。

引数:
  • inputValueProducer (ValueProducer<ValueType>) – キャッシュする入力値プロデューサ
戻り値の型:

ValueProducer<ValueType>

用例:

operator valueGen(
  io Scalar output
  )
{
  output = 2.71;
  report("Generating output = " + output);
}

operator entry() {
  ValueProducer<Scalar> vp = createValueCache(createValueGenerator(valueGen));
  report("vp.produce() = " + vp.produce());
  report("vp.produce() = " + vp.produce());
  report("calling vp.flush()");
  vp.flush();
  report("vp.produce() = " + vp.produce());
  report("vp.produce() = " + vp.produce());
}

出力:

Generating output = 2.71
vp.produce() = 2.71
vp.produce() = 2.71
calling vp.flush()
Generating output = 2.71
vp.produce() = 2.71
vp.produce() = 2.71

配列プロデューサを作成する関数

createConstArray(arrayExpr)

定数を返す配列プロデューサを作成します。

引数:
  • expr – 定数配列を得るため評価する式。この指揮はKL配列の一種である必要があります。 (可変長配列, 固定長配列, 外部配列のいずれか)
戻り値の型:

ArrayProducer<ElementType>

ElementTypearrayExpr により与えられる配列の要素の型

用例:

operator entry() {
  Integer a[]; a.push(42); a.push(17); a.push(52); a.push(871);
  ArrayProducer<Integer> ap = createConstArray(a);
  report(ap);
  report(ap.getCount());
  report(ap.produce(2));
}

出力:

ArrayProducer<Integer>
4
52
createArrayGenerator(sizeProducer, operatorName[, sharedValueProducer])

配列の要素を生成するため operatorName と名づけられたオペレータを呼ぶ、値プロデューサを作成します。

引数:
  • sizeProducer (ValueProducer<Size>) – Size 型の値を生成する値プロデューサ。この値プロデューサにより配列のサイズ(要素数)を決定します。
  • operatorName – 要素を生成するため呼ぶオペレータの名前
  • sharedValueProducer (ValueProducer<SharedValueType>) – (オプション) operatorName へ渡される共有される値プロデューサ
戻り値の型:

ArrayProducer<ElementType>

オペレータ operatorName は以下のプロトタイプに従う必要があります:

operator operatorName(
  io ElementType outputElementValue,
  in Size index,
  in Size size,
  // required if and only if sharedValueProducer provided
  in SharedValueType sharedValue
  );

ElementType 型は operatorName の第一パラメータの型から推測されます。

用例:

operator generator(
  io Float32 value,
  Size index,
  Size count,
  Float32 shared
  )
{
  report("generator: value=" + value + " index=" + index + " count=" + count + " shared=" + shared);
  value = shared*index;
}

operator entry() {
  ValueProducer<Size> cvg = createConstValue(Size(10));
  ArrayProducer<Float32> ag4 = createArrayGenerator(cvg, generator, createConstValue(Float32(3.141)));
  report(ag4);
  report(ag4.getCount());
  for (Size i=0; i<10; ++i)
    report(ag4.produce(i));
}

出力:

ArrayProducer<Float32>
10
generator: value=0 index=0 count=10 shared=3.141
0
generator: value=0 index=1 count=10 shared=3.141
3.141
generator: value=0 index=2 count=10 shared=3.141
6.282
generator: value=0 index=3 count=10 shared=3.141
9.423
generator: value=0 index=4 count=10 shared=3.141
12.564
generator: value=0 index=5 count=10 shared=3.141
15.705
generator: value=0 index=6 count=10 shared=3.141
18.846
generator: value=0 index=7 count=10 shared=3.141
21.987
generator: value=0 index=8 count=10 shared=3.141
25.128
generator: value=0 index=9 count=10 shared=3.141
28.269
createArrayTransform(inputArrayProducer, operatorName[, sharedValueProducer])

入力された配列プロデューサを operatorName という名前のオペレータを各要素に対して呼び出し、トランスフォームさせる配列プロデューサを作成します。

引数:
  • inputArrayProducer (ArrayProducer<ElementType>) – トランスフォームさせる入力配列プロデューサ
  • operatorName – 要素に対しトランスフォームするため適用するオペレータの名前
  • sharedValueProducer (ValueProducer<SharedValueType>) – (オプション) operatorName へ渡される共有される値プロデューサ
戻り値の型:

ArrayProducer<ElementType>

オペレータ operatorName は以下のプロトタイプに従う必要があります:

operator operatorName(
  io ElementType elementValue,
  in Size index,
  in Size size,
  // required if and only if sharedValueProducer provided
  in SharedValueType sharedValue
  );

用例:

operator transform(
  io Float32 value,
  Size index,
  Size count,
  Float32 shared
  )
{
  report("transform: value=" + value + " index=" + index + " count=" + count + " shared=" + shared);
  value *= shared * (index + 1);
}

operator entry() {
  Float32 inputArray[]; inputArray.push(5.6); inputArray.push(-3.4); inputArray.push(1.4142);
  ArrayProducer<Float32> inputArrayProducer = createConstArray(inputArray);
  ArrayProducer<Float32> transformedArrayProducer = createArrayTransform(inputArrayProducer, transform, createConstValue(Float32(2.56)));
  report(transformedArrayProducer);
  report(transformedArrayProducer.getCount());
  Size transformedArrayProducerCount = transformedArrayProducer.getCount();
  for (Index i=0; i<transformedArrayProducerCount; ++i)
    report(transformedArrayProducer.produce(i));
}

出力:

ArrayProducer<Float32>
3
transform: value=5.6 index=0 count=3 shared=2.56
14.336
transform: value=-3.4 index=1 count=3 shared=2.56
-17.408
transform: value=1.4142 index=2 count=3 shared=2.56
10.86106
createArrayMap(inputArrayProducer, operatorName[, sharedValueProducer])

オペレータ operatorName を呼び出し、結果を異なる配列プロデューサへと ―場合によっては異なる型へとマップする配列プロデューサを作成します。

引数:
  • inputArrayProducer (ArrayProducer<InputElementType>) – 要素のマップ元となる入力配列プロデューサ
  • operatorName – 要素のマップのために呼び出すオペレータの名前
  • sharedValueProducer (ValueProducer<SharedValueType>) – (オプション) operatorName へ渡される共有される値プロデューサ
戻り値の型:

ArrayProducer<OutputElementType>

オペレータ operatorName は以下のプロトタイプに従う必要があります:

operator operatorName(
  in InputElementType inputElementValue,
  io OutputElementType outputElementValue,
  in Size index,
  in Size size,
  // required if and only if sharedValueProducer provided
  in SharedValueType sharedValue
  );

OutputElementType の型は operatorName の第二パラメータから推測されます

用例:

operator map(
  Float32 inputValue,
  io String outputValue,
  Size index,
  Size count,
  Float32 shared
  )
{
  report("map: inputValue=" + inputValue + " index=" + index + " count=" + count + " shared=" + shared);
  Float32 float32Value = inputValue * shared * (index + 1);
  if (abs(float32Value) < 1.0)
    outputValue = "small";
  else if (abs(float32Value) < 10.0)
    outputValue = "medium";
  else if (abs(float32Value) < 100.0)
    outputValue = "large";
  else
    outputValue = "x-large";
}

operator entry() {
  Float32 inputArray[]; inputArray.push(5.6); inputArray.push(-0.034); inputArray.push(1.4142);
  ArrayProducer<Float32> inputArrayProducer = createConstArray(inputArray);
  ArrayProducer<String> mappedArrayProducer = createArrayMap(inputArrayProducer, map, createConstValue(Float32(2.56)));
  report(mappedArrayProducer);
  report(mappedArrayProducer.getCount());
  Size mappedArrayProducerCount = mappedArrayProducer.getCount();
  for (Index i=0; i<mappedArrayProducerCount; ++i)
    report(mappedArrayProducer.produce(i));
}

出力:

ArrayProducer<String>
3
map: inputValue=5.6 index=0 count=3 shared=2.56
large
map: inputValue=-0.034 index=1 count=3 shared=2.56
small
map: inputValue=1.4142 index=2 count=3 shared=2.56
large
createArrayCache(inputArrayProducer)

他の同型の配列プロデューサの結果をキャッシュする、配列プロデューサを作成します。詳細は Map-Reduce Programming Guide を参照してください。

引数:
  • inputArrayProducer (ArrayProducer<ElementType>) – キャッシュする入力配列プロデューサ
戻り値の型:

ArrayProducer<ElementType>

用例:

operator elementGen(
  io Scalar output,
  Size index
  )
{
  output = 2.71 * index;
  report("Generating output = " + output);
}

operator entry() {
  ArrayProducer<Scalar> ap = createArrayCache(
    createArrayGenerator(
      createConstValue(Size(4)),
      elementGen
      )
    );
  report("ap.produce(2) = " + ap.produce(2));
  report("ap.produce(2) = " + ap.produce(2));
  report("calling ap.flush()");
  ap.flush();
  report("ap.produce(2) = " + ap.produce(2));
  report("ap.produce(2) = " + ap.produce(2));
}

出力:

Generating output = 5.42
ap.produce(2) = 5.42
ap.produce(2) = 5.42
calling ap.flush()
Generating output = 5.42
ap.produce(2) = 5.42
ap.produce(2) = 5.42

Reduce 関数

createReduce(arrayProducer, operatorName[, sharedValueProducer])

operatorName オペレータを各々個別の要素をリダクションかけるために呼び出し、配列プロデューサを reduce し値を生成する、値プロデューサを作成します。

引数:
  • arrayProducer (ArrayProducer<ElementType>) – Reduceかかる入力配列プロデューサ
  • operatorName – 要素に対しトランスフォームするため適用するオペレータの名前
  • sharedValueProducer (ValueProducer<SharedValueType>) – (オプション) operatorName へ渡される共有される値プロデューサ
戻り値の型:

ValueProducer<ValueType>

オペレータ operatorName は以下のプロトタイプに従う必要があります:

operator operatorName(
  in ElementType inputElement,
  io ValueType outputValue,
  in Size index,
  in Size size,
  // required if and only if sharedValueProducer provided
  in SharedValueType sharedValue
  );

用例:

operator generator(
  io Integer outputValue,
  Size index,
  Size size
  )
{
  outputValue = index + 1;
}

operator reduce(
  Integer inputValue,
  io Integer outputValue,
  Size index,
  Size size
  )
{
  report("reduce: inputValue=" + inputValue);
  outputValue += inputValue;
}

operator entry() {
  ValueProducer<Integer> sum = createReduce(
    createArrayGenerator(
      createConstValue(Size(10)),
      generator
      ),
    reduce
    );
  report("sum.produce() = " + sum.produce());
}

出力:

reduce: inputValue=1
reduce: inputValue=2
reduce: inputValue=3
reduce: inputValue=4
reduce: inputValue=5
reduce: inputValue=6
reduce: inputValue=7
reduce: inputValue=8
reduce: inputValue=9
reduce: inputValue=10
sum.produce() = 55