Hello Experts!
I am currently developing an application that reads data from a machine sensor and feeds a prediction R script the results in real time. The results from the script should then be copied to a Netezza database. The input data is composed of 38 sensor values and their timestamp, adding up to 76 variables in input. The R script works through this data and then outputs what should be a 20 x 38 matrix with the prediction of the value that each of the 38 sensor variables will be at different times (from t1 to t20), but this matrix is being output as a single 760-element-long list. The table I'm supposed to write these values into has 38 columns each corresponding to a sensor variable.
What I tried to do is to use a Functor operator that would slice the list in 20 38-element-long lists. This works when I connect the Functor operator to a FileSink and write the values on a .txt file, but I can't write the same values in the DB even if each list corresponds to a row in the database because I can't find a way to map each value in the list to its corresponding column name.
To put it simply, what I have now is a list w=[A1,A2,...,A20,B1,...B20,...,Z1,...,Z20] in input that becomes {w1=[A1,B1,...,Z1],w2=[A2,B2,..Z2],...,w20=[A20,B20,...,Z20]}, and I'd need ODBCAppend to "understand" that A1 corresponds to column 1 of the Netezza table, B1 to column 2 and so on and so forth.
Does anyone have any suggestions on how I could achieve this?
Thank you,
Alex
Answer by Michael.K (1380) | Oct 14, 2015 at 02:55 AM
You have to write a Custom operator like this one:
stream < list < rstring > w > Data = ...()
{
}
stream< rstring field1, rstring field2, rstring field3, ..., rstring field38 > NetezzaData as O = Custom(Data as I)
{
logic
state:
{
mutable O otuple = (O){};
}
onTuple I:
{
int32 elementCount = size(I.w);
int32 sensorCount = elementCount / 38;
if (sensorCount * 38 != elementCount)
{
// What shall happen if there are not enough elements?
abort();
}
for (int32 i in range(sensorCount))
{
otuple.field1 = w[ 0 * 20 + i];
otuple.field2 = w[ 1 * 20 + i];
otuple.field3 = w[ 2 * 20 + i];
...
otuple.field38 = w[37 * 20 + i];
submit(otuple, O);
}
}
onPunct I:
{
submit(currentPunct(), O);
}
}
This is exactly what I needed! Thank you so much, Michael!
Answer by Kevin_Foster (520) | Oct 13, 2015 at 11:09 AM
Since you want to burst your list out into multiple rows in Netezza, you are going to need to burst your list into multiple tuples before it gets to the ODBC operator (with the correct schema for your destination table).
A Functor cannot create more output tuples than input tuples. I would use a Custom operator for this instead, iterating through the list in an onTuple block and calling submit() for each output tuple.
-Kevin
Hi Kevin,
Thank you for your reply. I tried using a Custom operator but I can't find a function that would allow me to populate a tuple by passing the single elements from the list inside a for loop. Do you happen to know if such a function exists?
Thanks,
Alex
Streams Studio launch command capture 1 Answer
Getting Median from Sorted data in Infosphere streams 1 Answer
Web Console not available on streams 4.0 2 Answers
ODBCAPPEND inserts into table only on second run of the program 4 Answers
Reconnection policy for JMS Source and JMS Sink -- Streams 4 Answers