Skip to content

Commit

Permalink
feat(basicResource): add unique codes nAmongM filter (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
pl-buiquang authored Feb 3, 2025
1 parent 08e3280 commit 541e2bf
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 32 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type BASIC_RESOURCE = CRITERIA & {
patientAge?: PATIENT_AGE, // The age constraint of the patient
dateRangeList?: Array<DATE_RANGE>, // The date range constraint of the resource
encounterDateRange?: DATE_RANGE // The date range constraint of the related encounter
uniqueFields?: Array<UNIQUE_FIELD> // The unique fields of the resource to count the patients
}

type PATIENT_AGE = {
Expand All @@ -144,6 +145,12 @@ type DATE_RANGE = {
dateIsNotNull?: boolean
}

type UNIQUE_FIELD = {
name?: string, // The fhir resource field to verify, for now only codes are supported and it is not need to fill this field which is ignored
operator: string, // The operator to use for the verification
n: number // The number of unique values
}

type TEMPORAL_CONSTRAINT = {
idList: string | Array<number>,
constraintType: "sameEncounter" | "differentEncounter" | "directChronologicalOrdering" | "sameEpisodeOfCare",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,42 @@ class QueryBuilderBasicResource(val querySolver: ResourceResolver) {
} else criterionDataFrame
}

private def filterByUniqueCodes(criterionDataFrame: DataFrame,
basicResource: BasicResource,
criterionId: Short): DataFrame = {
if (basicResource.uniqueFields.isDefined) {
val codes = basicResource.uniqueFields.get
val codeColumn = QueryColumn.CODE
val subjectColumn = QueryBuilderUtils.getSubjectColumn(criterionId)
var filterDataframe: Option[DataFrame] = None
for (code <- codes) {
val n = code.n
var operator = code.operator
if (operator != ">=" || n != 1) {
operator = if (operator == "=") "==" else operator
val groupByColumns = ListBuffer[String](QueryBuilderUtils.getSubjectColumn(criterionId), QueryBuilderUtils.buildColName(criterionId, codeColumn))
val filterPatientDataFrame: DataFrame = criterionDataFrame
.groupBy(groupByColumns.head, groupByColumns.tail.toList: _*)
.count()
.filter(s"count $operator $n")
.drop("count")
if (filterDataframe.isEmpty) {
filterDataframe = Some(filterPatientDataFrame)
} else {
filterDataframe = Some(filterDataframe.get.join(filterPatientDataFrame))
}
}
}
if (filterDataframe.isDefined) {
val filterPatientDataFrame = filterDataframe.get
return criterionDataFrame.join(filterPatientDataFrame,
criterionDataFrame(subjectColumn) <=> filterPatientDataFrame(subjectColumn),
"left_semi")
}
}
criterionDataFrame
}

/** Filter patient of input dataframe which does not have the required amount of occurrence.
*
* @param criterionDataFrame resulting dataframe of patient of a basicResource
Expand Down Expand Up @@ -268,10 +304,11 @@ class QueryBuilderBasicResource(val querySolver: ResourceResolver) {
basicResource,
criterionId,
isInTemporalConstraint)
criterionDataFrame = filterByUniqueCodes(criterionDataFrame, basicResource, criterionId)
criterionDataFrame = qbUtils.cleanDataFrame(criterionDataFrame,
isInTemporalConstraint,
selectedColumns,
subjectColumn)
isInTemporalConstraint,
selectedColumns,
subjectColumn)

if (logger.isDebugEnabled) {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ abstract class BaseQuery(val _type: String, _id: Short, isInclusive: Boolean) {
val IsInclusive: Boolean = isInclusive
}

case class UniqueFieldConstraint(
name: Option[String],
operator: String,
n: Int
)

case class Occurrence(n: Int,
operator: String,
sameEncounter: Option[Boolean],
Expand All @@ -35,6 +41,7 @@ case class BasicResource(_id: Short,
occurrence: Option[Occurrence] = None,
patientAge: Option[PatientAge] = None,
encounterDateRange: Option[DateRange] = None,
uniqueFields: Option[List[UniqueFieldConstraint]] = None,
nullAvailableFieldList: Option[List[String]] = None)
extends BaseQuery("basic_resource", _id, isInclusive) {
override def toString: String = getClass.getName + "@" + Integer.toHexString(hashCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,15 @@ class CriterionTagsParser(val queryBuilderConfigs: ResourceConfig) {
} else solrFieldList
}

def getCodeFieldList(solrFieldList: List[String]): List[String] = {
if (genericQuery.uniqueFields.isDefined && queryBuilderConfigs
.requestKeyPerCollectionMap(collection)
.contains(QueryColumn.CODE)) {
(solrFieldList ++ queryBuilderConfigs.requestKeyPerCollectionMap(collection)(
QueryColumn.CODE)).distinct
} else solrFieldList
}

def getIsDateTimeAvailable: Boolean = {
val colsMapping = queryBuilderConfigs
.requestKeyPerCollectionMap(collection)
Expand All @@ -306,6 +315,7 @@ class CriterionTagsParser(val queryBuilderConfigs: ResourceConfig) {
requiredSolrFieldList = getResourceGroupByFieldList(requiredSolrFieldList)
requiredSolrFieldList =
convertDatePreferenceToDateTimeSolrField(requiredSolrFieldList, collection)
requiredSolrFieldList = getCodeFieldList(requiredSolrFieldList)
val isDateTimeAvailable: Boolean = getIsDateTimeAvailable
val isEncounterAvailable: Boolean = getIsEncounterAvailable
val isEpisodeOfCareAvailable: Boolean = getIsEpisodeOfCareAvailable
Expand Down Expand Up @@ -353,10 +363,10 @@ class CriterionTagsParser(val queryBuilderConfigs: ResourceConfig) {
val isDateTimeAvailable: Boolean = getIsDateTimeAvailable
val isEncounterAvailable: Boolean = getIsEncounterAvailable
CriterionTags(isDateTimeAvailable,
isEncounterAvailable,
getIsEpisodeOfCareAvailable,
isInTemporalConstraint = false,
withOrganizations = requestOrganization)
isEncounterAvailable,
getIsEpisodeOfCareAvailable,
isInTemporalConstraint = false,
withOrganizations = requestOrganization)
}

private def convertDatePreferenceToDateTimeSolrField(datePreferenceList: List[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ object QueryParser {
temporalConstraints: Option[List[GenericTemporalConstraint]],
criteria: Option[List[GenericQuery]],
nAmongMOptions: Option[Occurrence],
uniqueFields: Option[List[UniqueFieldConstraint]],
version: Option[String],
sourcePopulation: Option[SourcePopulationDTO],
request: Option[GenericQuery],
Expand All @@ -153,6 +154,7 @@ object QueryParser {
Json.reads[GenericTemporalConstraint]
implicit lazy val sourcePopulationReads = Json.reads[SourcePopulationDTO]
implicit lazy val dateRange = Json.reads[DateRange]
implicit lazy val uniqueFieldConstraintReads = Json.reads[UniqueFieldConstraint]
implicit lazy val queryRead = Json.reads[GenericQuery]
logger.info(s"Trying to parse query ${cohortDefinitionSyntaxJsonString}")
val cohortRequestOption =
Expand Down Expand Up @@ -256,6 +258,7 @@ object QueryParser {
else genericQuery.filterFhir.get,
occurrence = genericQuery.occurrence,
patientAge = genericQuery.patientAge,
uniqueFields = genericQuery.uniqueFields,
nullAvailableFieldList = genericQuery.nullAvailableFieldList,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package fr.aphp.id.eds.requester.query.resolver.rest
import fr.aphp.id.eds.requester.query.resolver.ResourceConfig
import fr.aphp.id.eds.requester.{FhirResource, QueryColumn}
import org.hl7.fhir.instance.model.api.IBase
import org.hl7.fhir.r4.model.{DateTimeType, DateType, IdType, Reference}
import org.hl7.fhir.r4.model._

case class QueryColumnMapping(queryColName: String,
fhirPath: String,
Expand Down Expand Up @@ -38,21 +38,21 @@ class RestFhirQueryElementsConfig extends ResourceConfig {
QueryColumnMapping(QueryColumn.ENCOUNTER_END_DATE, "period.end", classOf[DateTimeType])),
)),
FhirResource.OBSERVATION -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("effectiveDateTime"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("effectiveDateTime"), codeColumn = Some("code.coding.code"))),
FhirResource.CONDITION -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("recordedDate"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("recordedDate"), codeColumn = Some("code.coding.code"))),
FhirResource.MEDICATION_REQUEST -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"),
Some("encounter"),
Some("dispenseRequest.validityPeriod.start"))),
Some("dispenseRequest.validityPeriod.start"), codeColumn = Some("medicationCodeableConcept.coding.code"))),
FhirResource.MEDICATION_ADMINISTRATION -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("effectivePeriod.start"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("effectivePeriod.start"), codeColumn = Some("medicationCodeableConcept.coding.code"))),
FhirResource.DOCUMENT_REFERENCE -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("date"))),
FhirResource.CLAIM -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("created"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("created"), codeColumn = Some("diagnosis.diagnosisCodeableConcept.coding.code"))),
FhirResource.PROCEDURE -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("date"))),
defaultResourceMapping(Some("subject"), Some("encounter"), Some("date"), codeColumn = Some("code.coding.code"))),
FhirResource.IMAGING_STUDY -> addJoinedResourceColumns(
defaultResourceMapping(Some("subject"), Some("encounter"), Some("started"))),
FhirResource.QUESTIONNAIRE_RESPONSE -> addJoinedResourceColumns(
Expand Down Expand Up @@ -120,7 +120,9 @@ class RestFhirQueryElementsConfig extends ResourceConfig {

private def defaultResourceMapping(patientColumn: Option[String] = Some("patient"),
encounterColumn: Option[String] = Some("encounter"),
eventColumn: Option[String] = None): List[ResourceMapping] = {
eventColumn: Option[String] = None,
codeColumn: Option[String] = None
): List[ResourceMapping] = {
var resourceMappingList = List(
ResourceMapping(QueryColumnMapping(QueryColumn.ID, "id", classOf[IdType]))
)
Expand All @@ -136,6 +138,10 @@ class RestFhirQueryElementsConfig extends ResourceConfig {
resourceMappingList = resourceMappingList :+ ResourceMapping(
QueryColumnMapping(QueryColumn.EVENT_DATE, eventColumn.get, classOf[DateTimeType]))
}
if (codeColumn.isDefined) {
resourceMappingList = resourceMappingList :+ ResourceMapping(
QueryColumnMapping(QueryColumn.CODE, codeColumn.get, classOf[CodeableConcept]))
}
resourceMappingList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ import fr.aphp.id.eds.requester.{FhirResource, QueryColumn}
class SolrQueryElementsConfig extends ResourceConfig {

def buildMap(patientCol: List[String],
dateColListTarget: List[String]): Map[String, List[String]] = {
dateColListTarget: List[String], codeCol: Option[List[String]] = None): Map[String, List[String]] = {
Map(
QueryColumn.PATIENT -> patientCol,
QueryColumn.EVENT_DATE -> dateColListTarget,
QueryColumn.ENCOUNTER -> List(SolrColumn.ENCOUNTER),
QueryColumn.ENCOUNTER_START_DATE -> List(SolrColumn.ENCOUNTER_START_DATE),
QueryColumn.ENCOUNTER_END_DATE -> List(SolrColumn.ENCOUNTER_END_DATE),
QueryColumn.ID -> List(SolrColumn.ID),
QueryColumn.ORGANIZATIONS -> List(SolrColumn.ORGANIZATIONS)
)
QueryColumn.ORGANIZATIONS -> List(SolrColumn.ORGANIZATIONS),
) ++ codeCol.map(QueryColumn.CODE -> _).toMap
}

def buildMap(dateColListTarget: List[String]): Map[String, List[String]] = {
buildMap(List(SolrColumn.PATIENT), dateColListTarget)
def buildDefaultMap(dateColListTarget: List[String], codeCol: Option[List[String]] = None): Map[String, List[String]] = {
buildMap(List(SolrColumn.PATIENT), dateColListTarget, codeCol)
}

override def requestKeyPerCollectionMap: Map[String, Map[String, List[String]]] = Map(
Expand All @@ -32,26 +32,38 @@ class SolrQueryElementsConfig extends ResourceConfig {
QueryColumn.ID -> List(SolrColumn.ID),
QueryColumn.ORGANIZATIONS -> List(SolrColumn.ORGANIZATIONS)
),
FhirResource.MEDICATION_REQUEST -> buildMap(
List(SolrColumn.MedicationRequest.PERIOD_START, SolrColumn.MedicationRequest.PERIOD_END)),
FhirResource.MEDICATION_ADMINISTRATION -> buildMap(
List(SolrColumn.MedicationAdministration.PERIOD_START)),
FhirResource.OBSERVATION -> buildMap(List(SolrColumn.Observation.EFFECTIVE_DATETIME)),
FhirResource.CONDITION -> buildMap(List(SolrColumn.Condition.RECORDED_DATE)),
FhirResource.MEDICATION_REQUEST -> buildDefaultMap(
List(SolrColumn.MedicationRequest.PERIOD_START, SolrColumn.MedicationRequest.PERIOD_END),
codeCol = Some(List(SolrColumn.MedicationRequest.CODE_ATC, SolrColumn.MedicationRequest.CODE_UCD))
),
FhirResource.MEDICATION_ADMINISTRATION -> buildDefaultMap(
List(SolrColumn.MedicationAdministration.PERIOD_START),
codeCol = Some(List(SolrColumn.MedicationAdministration.CODE_ATC, SolrColumn.MedicationAdministration.CODE_UCD))
),
FhirResource.OBSERVATION -> buildDefaultMap(List(SolrColumn.Observation.EFFECTIVE_DATETIME),
codeCol = Some(List(SolrColumn.Observation.CODE))
),
FhirResource.CONDITION -> buildDefaultMap(List(SolrColumn.Condition.RECORDED_DATE),
codeCol = Some(List(SolrColumn.Condition.CODE))
),
FhirResource.PATIENT -> Map(QueryColumn.PATIENT -> List(SolrColumn.PATIENT),
QueryColumn.ID -> List(SolrColumn.ID),
QueryColumn.ORGANIZATIONS -> List(SolrColumn.ORGANIZATIONS)),
FhirResource.DOCUMENT_REFERENCE -> buildMap(List(SolrColumn.Document.DATE)),
FhirResource.COMPOSITION -> buildMap(List(SolrColumn.Document.DATE)),
FhirResource.DOCUMENT_REFERENCE -> buildDefaultMap(List(SolrColumn.Document.DATE)),
FhirResource.COMPOSITION -> buildDefaultMap(List(SolrColumn.Document.DATE)),
FhirResource.GROUP -> Map(QueryColumn.PATIENT -> List(SolrColumn.Group.RESOURCE_ID),
QueryColumn.ID -> List(SolrColumn.ID)),
FhirResource.CLAIM -> buildMap(List(SolrColumn.Claim.CREATED)),
FhirResource.PROCEDURE -> buildMap(List(SolrColumn.Procedure.DATE)),
FhirResource.CLAIM -> buildDefaultMap(List(SolrColumn.Claim.CREATED),
codeCol = Some(List(SolrColumn.Claim.CODE))
),
FhirResource.PROCEDURE -> buildDefaultMap(List(SolrColumn.Procedure.DATE),
codeCol = Some(List(SolrColumn.Procedure.CODE))
),
FhirResource.IMAGING_STUDY -> (buildMap(List(SolrColumn.PATIENT),
List(SolrColumn.ImagingStudy.STARTED,
SolrColumn.ImagingStudy.SERIES_STARTED)) ++ Map(
QueryColumn.GROUP_BY -> List(SolrColumn.ImagingStudy.STUDY_ID))),
FhirResource.QUESTIONNAIRE_RESPONSE -> (buildMap(
FhirResource.QUESTIONNAIRE_RESPONSE -> (buildDefaultMap(
List(SolrColumn.QuestionnaireResponse.AUTHORED)) ++ Map(
QueryColumn.EPISODE_OF_CARE -> List(SolrColumn.EPISODE_OF_CARE))),
FhirResource.UNKNOWN -> Map()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import fr.aphp.id.eds.requester.query.resolver.{ResourceConfig, ResourceResolver
import fr.aphp.id.eds.requester.tools.SolrTools
import org.apache.log4j.Logger
import org.apache.solr.client.solrj.SolrQuery
import org.apache.spark.sql.functions.{array, array_join, col, explode}
import org.apache.spark.sql.{DataFrame, SparkSession}

/** Class for questioning solr. */
Expand Down Expand Up @@ -42,6 +43,16 @@ class SolrQueryResolver(solrSparkReader: SolrSparkReader,
}
val convFunc = (columnName: String) =>
qbConfigs.reverseColumnMapping(resource.resourceType, columnName)

if (resource.uniqueFields.isDefined) {
val codeColumns = qbConfigs.requestKeyPerCollectionMap(resource.resourceType).getOrElse(QueryColumn.CODE, List())
if (codeColumns.nonEmpty) {
criterionDataFrame = criterionDataFrame.withColumn(
QueryColumn.CODE,
array_join(array(codeColumns.map((c) => col(s"`${c}`")): _*), ",")
)
}
}
criterionDataFrame.toDF(criterionDataFrame.columns.map(c => convFunc(c)).toSeq: _*)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,35 @@ package object solr {

object MedicationAdministration {
final val PERIOD_START = "effectivePeriod.start"
final val CODE_ATC = "_sort.atc"
final val CODE_UCD = "_sort.ucd"
}

object MedicationRequest {
final val PERIOD_START = "dispenseRequest.validityPeriod.start"
final val PERIOD_END = "dispenseRequest.validityPeriod.end"
final val CODE_ATC = "_sort.atc"
final val CODE_UCD = "_sort.ucd"
}

object Observation {
final val EFFECTIVE_DATETIME = "effectiveDateTime"
final val CODE = "code.coding.display.anabio"
}

object Claim {
final val CREATED = "created"
final val CODE = "diagnosis.diagnosisCodeableConcept.coding.display"
}

object Condition {
final val RECORDED_DATE = "recordedDate"
final val CODE = "code.coding.display"
}

object Procedure {
final val DATE = "performedDateTime"
final val CODE = "code.coding.display"
}

object Document {
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/fr/aphp/id/eds/requester/requester.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ package object requester {
final val ENCOUNTER_START_DATE = "encounter_start_date"
final val ENCOUNTER_END_DATE = "encounter_end_date"
final val PATIENT_BIRTHDATE = "patient_birthdate"
final val CODE = "code"
final val EVENT_DATE = "event_date"
final val LOCAL_DATE = "localDate"
final val AGE = "age"
Expand Down
3 changes: 3 additions & 0 deletions src/test/resources/testCases/nAmongMUniqueFields/expected.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
subject_id
7
1
Loading

0 comments on commit 541e2bf

Please sign in to comment.