Skip to content

Commit

Permalink
NIFI-14265 Ensured aligned date columns across multiple sheets when u…
Browse files Browse the repository at this point in the history
…sing the Use Starting Row strategy are inferred only as dates.

Signed-off-by: Chris Sampson <chris.sampson82@gmail.com>

This closes #9750.
  • Loading branch information
dan-s1 authored and ChrisSamo632 committed Feb 28, 2025
1 parent d60f5b2 commit c96b750
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class ExcelHeaderSchemaStrategy implements SchemaAccessStrategy {
static final int NUM_ROWS_TO_DETERMINE_TYPES = 10; // NOTE: This number is arbitrary.
static final AllowableValue USE_STARTING_ROW = new AllowableValue("Use Starting Row", "Use Starting Row",
"The configured first row of the Excel file is a header line that contains the names of the columns. The schema will be derived by using the "
+ "column names in the header and the following " + NUM_ROWS_TO_DETERMINE_TYPES + " rows to determine the type(s) of each column");
+ "column names in the header of the first sheet and the following " + NUM_ROWS_TO_DETERMINE_TYPES + " rows to determine the type(s) of each column " +
"while the configured header rows of subsequent sheets are skipped.");

private final PropertyContext context;
private final ComponentLog logger;
Expand Down Expand Up @@ -85,16 +86,18 @@ public RecordSchema getSchema(Map<String, String> variables, InputStream content
int index = 0;

while (rowIterator.hasNext()) {
Row row = rowIterator.next();
if (index == 0) {
fieldNames = getFieldNames(firstRow, row);
} else if (index <= NUM_ROWS_TO_DETERMINE_TYPES) {
inferSchema(row, fieldNames, typeMap);
} else {
break;
}

index++;
Row row = rowIterator.next();
if (index == 0) {
fieldNames = getFieldNames(firstRow, row);
} else if (row.getRowNum() == zeroBasedFirstRow) { // skip first row of all sheets
continue;
} else if (index <= NUM_ROWS_TO_DETERMINE_TYPES) {
inferSchema(row, fieldNames, typeMap);
} else {
break;
}

index++;
}

if (typeMap.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellStyle;
import org.apache.poi.ss.usermodel.CreationHelper;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
Expand All @@ -41,12 +44,13 @@
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.time.LocalDate;
import java.util.Map;

import static java.nio.file.Files.newDirectoryStream;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -74,9 +78,9 @@ public static void cleanUpAfterAll() {

@Test
void testWhereConfiguredStartRowIsEmpty() throws IOException {
Object[][] data = {{}, {1, "Manny"}, {2, "Moe"}, {3, "Jack"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
Object[][] singleSheet = {{}, {1, "Manny"}, {2, "Moe"}, {3, "Jack"}};
final ByteArrayOutputStream outputStream = createWorkbook(singleSheet);
final Map<PropertyDescriptor, String> properties = Map.of();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE);

Expand All @@ -88,9 +92,9 @@ void testWhereConfiguredStartRowIsEmpty() throws IOException {

@Test
void testWhereConfiguredStartRowHasEmptyCell() throws Exception {
Object[][] data = {{"ID", "", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
Object[][] singleSheet = {{"ID", "", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = createWorkbook(singleSheet);
final Map<PropertyDescriptor, String> properties = Map.of();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE);

Expand All @@ -103,9 +107,9 @@ void testWhereConfiguredStartRowHasEmptyCell() throws Exception {

@Test
void testWhereInferenceRowHasMoreCellsThanFieldNames() throws Exception {
Object[][] data = {{"ID", "First", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M", "Extra"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
Object[][] singleSheet = {{"ID", "First", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M", "Extra"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = createWorkbook(singleSheet);
final Map<PropertyDescriptor, String> properties = Map.of();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE);

Expand All @@ -117,9 +121,9 @@ void testWhereInferenceRowHasMoreCellsThanFieldNames() throws Exception {

@Test
void testWhereTotalRowsLessThanConfiguredInferenceRows() throws Exception {
Object[][] data = {{"ID", "First", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
Object[][] singleSheet = {{"ID", "First", "Middle"}, {1, "Manny", "M"}, {2, "Moe", "M"}, {3, "Jack", "J"}};
final ByteArrayOutputStream outputStream = createWorkbook(singleSheet);
final Map<PropertyDescriptor, String> properties = Map.of();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE);

Expand All @@ -130,12 +134,12 @@ void testWhereTotalRowsLessThanConfiguredInferenceRows() throws Exception {

@Test
void testWhereConfiguredInferenceRowsHasAnEmptyRow() throws IOException {
Object[][] data = {{"ID", "First", "Middle"}, {1, "One", "O"}, {2, "Two", "T"}, {3, "Three", "T"},
Object[][] singleSheet = {{"ID", "First", "Middle"}, {1, "One", "O"}, {2, "Two", "T"}, {3, "Three", "T"},
{4, "Four", "F"}, {5, "Five", "F"}, {}, {7, "Seven", "S"}, {8, "Eight", "E"},
{9, "Nine", "N"}, {10, "Ten", "T"}};

final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ByteArrayOutputStream outputStream = createWorkbook(singleSheet);
final Map<PropertyDescriptor, String> properties = Map.of();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE);

Expand All @@ -146,12 +150,12 @@ void testWhereConfiguredInferenceRowsHasAnEmptyRow() throws IOException {

@Test
void testWhereTotalRowsGreaterThanConfiguredInferenceRows() throws Exception {
Object[][] data = {{"ID", "First", "Middle"}, {1, "One", "O"}, {2, "Two", "T"}, {3, "Three", "T"},
Object[][] singleSheet = {{"ID", "First", "Middle"}, {1, "One", "O"}, {2, "Two", "T"}, {3, "Three", "T"},
{4, "Four", "F"}, {5, "Five", "F"}, {6, "Six", "S"}, {7, "Seven", "S"}, {8, "Eight", "E"},
{9, "Nine", "N"}, {10, "Ten", "T"}, {11, "Eleven", "E"}};

final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
final ByteArrayOutputStream outputStream = createWorkbook(singleSheet);
final Map<PropertyDescriptor, String> properties = Map.of();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE);

Expand All @@ -162,9 +166,9 @@ void testWhereTotalRowsGreaterThanConfiguredInferenceRows() throws Exception {

@Test
void testWhereConfiguredInferenceRowsAreAllBlank() throws IOException {
Object[][] data = {{"ID", "First", "Middle"}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {11, "Eleven", "E"}};
final ByteArrayOutputStream outputStream = getSingleSheetWorkbook(data);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
Object[][] singleSheet = {{"ID", "First", "Middle"}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {11, "Eleven", "E"}};
final ByteArrayOutputStream outputStream = createWorkbook(singleSheet);
final Map<PropertyDescriptor, String> properties = Map.of();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE);

Expand All @@ -174,22 +178,60 @@ void testWhereConfiguredInferenceRowsAreAllBlank() throws IOException {
}
}

private static ByteArrayOutputStream getSingleSheetWorkbook(Object[][] data) throws IOException {
@Test
void testAlignedDateColumnsAcrossTwoSheets() throws Exception {
final String dateColumnName = "Date";
final Object[] columnNames = {dateColumnName, "Something", "Name"};
final Object[][] firstSheet =
{columnNames, {LocalDate.of(2025, 2, 1), "test1", "Sheet1"}, {LocalDate.of(2024, 2, 12), "test2", "Sheet1"}};
Object[][] secondSheet =
{columnNames, {LocalDate.of(1976, 9, 11), "test1", "Sheet2"}, {LocalDate.of(1987, 2, 12), "test2", "Sheet2"}};
final ByteArrayOutputStream outputStream = createWorkbook(firstSheet, secondSheet);
final Map<PropertyDescriptor, String> properties = Map.of();
final ConfigurationContext context = new MockConfigurationContext(properties, null, null);
final ExcelHeaderSchemaStrategy schemaStrategy = new ExcelHeaderSchemaStrategy(context, logger, TIME_VALUE_INFERENCE);

try (final InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray())) {
final RecordSchema schema = schemaStrategy.getSchema(null, inputStream, null);
final RecordField dateRecordField = schema.getField(dateColumnName).orElse(null);

assertNotNull(dateRecordField);
assertEquals(RecordFieldType.DATE, dateRecordField.getDataType().getFieldType(), String.format("Expected record field type to be %s but it was type %s",
RecordFieldType.DATE, dateRecordField.getDataType().getFieldType()));
}
}

private static ByteArrayOutputStream createWorkbook(Object[][]... sheetData) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

try (XSSFWorkbook workbook = new XSSFWorkbook()) {
final XSSFSheet sheet = workbook.createSheet("Sheet 1");
int rowCount = 0;
for (Object[] dataRow : data) {
Row row = sheet.createRow(rowCount++);
int columnCount = 0;
for (Object field : dataRow) {
Cell cell = row.createCell(columnCount++);
if (field instanceof String) {
cell.setCellValue((String) field);
} else if (field instanceof Number) {
cell.setCellValue(((Number) field).doubleValue());
CreationHelper creationHelper = workbook.getCreationHelper();
CellStyle dayMonthYearCellStyle = workbook.createCellStyle();
dayMonthYearCellStyle.setDataFormat(creationHelper.createDataFormat().getFormat("dd/mm/yyyy"));
int sheetCount = 1;

for (Object[][] singleSheet : sheetData) {
final XSSFSheet sheet = workbook.createSheet("Sheet " + sheetCount);
int rowCount = 0;

for (Object[] singleRow : singleSheet) {
Row row = sheet.createRow(rowCount++);
int columnCount = 0;

for (Object field : singleRow) {
Cell cell = row.createCell(columnCount++);
switch (field) {
case String string -> cell.setCellValue(string);
case Number number -> cell.setCellValue(number.doubleValue());
case LocalDate localDate -> {
cell.setCellValue(localDate);
cell.setCellStyle(dayMonthYearCellStyle);
}
default -> throw new IllegalStateException("Unexpected value: " + field);
}
}
}
sheetCount++;
}
workbook.write(outputStream);
}
Expand Down

0 comments on commit c96b750

Please sign in to comment.