subroutine ESMF_StateReconcile_driver(state, vm, rc)
!
! !ARGUMENTS:
      type (ESMF_State), intent(inout) :: state
      type (ESMF_VM),    intent(in)    :: vm
      integer,           intent(out)   :: rc
!
! !DESCRIPTION:
!
!   Drive the actual state reconcile precedure.
!
!   The arguments are:
!   \begin{description}
!   \item[state]
!     {\tt ESMF\_State} to collect information from.
!   \item[vm]
!     The current {\tt ESMF\_VM} (virtual machine).  All PETs in this
!     {\tt ESMF\_VM} will exchange information about objects which might
!     only be known to one or more PETs, and ensure all PETs in this VM
!     have a consistent view of the object list in this {\tt ESMF\_State}.
!   \item[{[rc]}]
!     Return code; equals {\tt ESMF\_SUCCESS} if there are no errors.
!   \end{description}
!EOPI
    integer :: localrc
    integer :: memstat
    integer :: localPet, petCount
    logical, parameter    :: meminfo = .false.
    logical, parameter    :: profile = .false.
    integer, pointer                     :: nitems_buf(:)
    type (ESMF_StateItemWrap), pointer   :: siwrap(:)
    integer,         pointer             :: ids_send(:)
    type(ESMF_VMId), pointer             :: vmids_send(:)
    integer, allocatable, target         :: vmintids_send(:)
    type(ESMF_VMId), allocatable, target :: vmIdMap(:)
    type(ESMF_VMId), pointer             :: vmIdMap_ptr(:)
    type(ESMF_AttReconcileFlag)          :: attreconflag
    type(ESMF_InfoCache)                 :: info_cache
    ! -------------------------------------------------------------------------
    localrc = ESMF_RC_NOT_IMPL
    ! Attributes must be reconciled to de-duplicate Field geometry proxies
    attreconflag = ESMF_ATTRECONCILE_ON
    if (meminfo) call ESMF_VMLogMemInfo("entering ESMF_StateReconcile_driver")
    call ESMF_VMGet(vm, localPet=localPet, petCount=petCount, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
#if 0
    call ESMF_StateLog(state, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
#endif
    ! -------------------------------------------------------------------------
    ! (0) Interchange item counts between PETs.  Set up counts/displacements
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionEnter("(0) Interchange item counts", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    siwrap     => null ()
    nitems_buf => null ()
    call ESMF_ReconcileInitialize (state, vm, siwrap=siwrap, &
      nitems_all=nitems_buf, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionExit("(0) Interchange item counts", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    if (meminfo) call ESMF_VMLogMemInfo ("after (0) Interchange item counts")
    ! -------------------------------------------------------------------------
    ! (1) Each PET constructs its send arrays containing local Id
    ! and VMId info for each object contained in the State.
    ! Note that element zero is reserved for the State itself.
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionEnter("(1) Construct send arrays", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    ids_send   => null ()
    vmids_send => null ()
    if (profile) then
      call ESMF_TraceRegionEnter("ESMF_ReconcileGetStateIDInfo", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    call ESMF_ReconcileGetStateIDInfo (state, siwrap, &
          id=  ids_send, &
        vmid=vmids_send, &
        rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    if (profile) then
      call ESMF_TraceRegionExit("ESMF_ReconcileGetStateIDInfo", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! Translate VmId objects to an integer representation to minimize memory
    ! usage. This is also beneficial for performance.
    if (profile) then
      call ESMF_TraceRegionEnter("ESMF_VMTranslateVMId", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    call ESMF_VMTranslateVMId(vm, vmIds=vmids_send, ids=vmintids_send, &
      vmIdMap=vmIdMap, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    vmIdMap_ptr => vmIdMap
    if (profile) then
      call ESMF_TraceRegionExit("ESMF_VMTranslateVMId", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! VM integer ids should always start with 1
    if (profile) then
      call ESMF_TraceRegionEnter("Check vmIntIds", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    if (any(vmintids_send(:) <= 0)) then
      call ESMF_LogSetError(ESMF_RC_INTNRL_INCONS, &
        msg="All integer VM ids must be greater than 0!", &
        ESMF_CONTEXT, rcToReturn=rc)
      return
    endif
    if (profile) then
      call ESMF_TraceRegionExit("Check vmIntIds", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
#ifdef RECONCILE_LOG_on
    block
      character(160):: msgStr
      write(msgStr,*) "ESMF_StateReconcile_driver() size(vmids_send): ", &
        size(vmids_send)
      call ESMF_LogWrite(msgStr, ESMF_LOGMSG_DEBUG, rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
      write(msgStr,*) "ESMF_StateReconcile_driver() size(vmIdMap): ", &
        size(vmIdMap)
      call ESMF_LogWrite(msgStr, ESMF_LOGMSG_DEBUG, rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
      write(msgStr,*) "ESMF_StateReconcile_driver() size(vmintids_send): ", &
        size(vmintids_send)
      call ESMF_LogWrite(msgStr, ESMF_LOGMSG_DEBUG, rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    end block
#endif
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionExit("(1) Construct send arrays", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    if (meminfo) call ESMF_VMLogMemInfo ("after (1) Construct send arrays")
#ifdef UNIQUE_GEOM_INFO_TREAT_on
#ifdef RECONCILE_LOG_on
    block
      type(ESMF_InfoDescribe)   :: idesc
      ! Log a JSON State representation -----------------------------------------
      call idesc%Initialize(createInfo=.true., addObjectInfo=.true., &
        vmIdMap=vmIdMap_ptr, rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
      call idesc%Update(state, "", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
      call ESMF_LogWrite("state_json_after_vmid="// &
        ESMF_InfoDump(idesc%info, indent=2), ESMF_LOGMSG_DEBUG, rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
      call idesc%Destroy(rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
    end block
#endif
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionEnter("(2) Set Field metadata for unique geometries", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    ! Update Field metadata for unique geometries. This will traverse the state
    ! hierarchy adding reconcile-specific attributes that will find unique
    ! geometry objects and maintain sufficient information to re-establish
    ! references once the objects have been communicated and deserialized.
    ! -------------------------------------------------------------------------
    call info_cache%Initialize(localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
    call info_cache%UpdateFields(state, vmIdMap=vmIdMap_ptr, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
    call info_cache%Destroy(localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionExit("(2) Set Field metadata for unique geometries", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    if (meminfo) call ESMF_VMLogMemInfo ("after (2) Update Field metadata")
#endif
#ifdef RECONCILE_LOG_on
    block
      type(ESMF_InfoDescribe)   :: idesc
      ! Log a JSON State representation -----------------------------------------
      call idesc%Initialize(createInfo=.true., addObjectInfo=.true., &
        vmIdMap=vmIdMap_ptr, rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
      call idesc%Update(state, "", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
      call ESMF_LogWrite("state_json_after_set_field_meta="// &
        ESMF_InfoDump(idesc%info, indent=2), ESMF_LOGMSG_DEBUG, rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
      call idesc%Destroy(rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, rcToReturn=rc)) return
    end block
#endif
#if 1
    ! ------------------------------------------------------------------------
    ! This is the new (2024) Reconcile implementation with log(petCount) scaling
    ! ------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionEnter("(2<) ESMF_ReconcileMultiCompCase", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! ------------------------------------------------------------------------
    call ESMF_ReconcileMultiCompCase(state, vm=vm, vmIdMap=vmIdMap_ptr, &
      attreconflag=attreconflag, siwrap=siwrap, vmintids_send=vmintids_send, &
      rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    ! ------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionExit("(2<) ESMF_ReconcileMultiCompCase", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! ------------------------------------------------------------------------
#else
    ! ------------------------------------------------------------------------
    ! This is the old Reconcile implementation. It uses a brute force appraoch
    ! using Alltoall() communication that scales with petCount^2.
    ! Only left here in case we run into situations that are not covered by the
    ! new Reconcile implementation.
    ! ------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionEnter("(2<) ESMF_ReconcileBruteForce", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! ------------------------------------------------------------------------
    call ESMF_ReconcileBruteForce(state, vm=vm, &
      attreconflag=attreconflag, siwrap=siwrap, ids_send=ids_send, &
      vmids_send=vmids_send, vmintids_send=vmintids_send, &
      nitems_buf=nitems_buf, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    ! ------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionExit("(2<) ESMF_ReconcileBruteForce", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! ------------------------------------------------------------------------
#endif
    ! Clean up
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionEnter("(X) Clean-up", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    if (associated (ids_send)) then
      deallocate (ids_send, vmids_send, stat=memstat)
      if (ESMF_LogFoundDeallocError(memstat, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    end if
    call ESMF_VMIdDestroy(vmIdMap, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    deallocate (vmIdMap, stat=memstat)
    if (ESMF_LogFoundDeallocError(memstat, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    if (associated (siwrap)) then
      deallocate (siwrap, stat=memstat)
      if (ESMF_LogFoundDeallocError(memstat, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    end if
    deallocate (nitems_buf, stat=memstat)
    if (ESMF_LogFoundDeallocError(memstat, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionExit("(X) Clean-up", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    if (meminfo) call ESMF_VMLogMemInfo ("after (X) Clean-up")
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionEnter("(X+1) Reconcile Zapped Proxies", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    call ESMF_ReconcileZappedProxies(state, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    ! -------------------------------------------------------------------------
    if (profile) then
      call ESMF_TraceRegionExit("(X+1) Reconcile Zapped Proxies", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    if (meminfo) call ESMF_VMLogMemInfo ("(X+1) Reconcile Zapped Proxies")
#ifdef UNIQUE_GEOM_INFO_TREAT_on
    if (profile) then
      call ESMF_TraceRegionEnter("(X+2) Use Field metadata for unique geometries", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! Traverse the State hierarchy and fix Field references to a shared geometry
    call ESMF_InfoCacheReassembleFields(state, state, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    ! Traverse the state hierarchy and remove reconcile-specific attributes
    call ESMF_InfoCacheReassembleFieldsFinalize(state, rc=localrc)
    if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
      rcToReturn=rc)) return
    if (profile) then
      call ESMF_TraceRegionExit("(X+2) Use Field metadata for unique geometries", rc=localrc)
      if (ESMF_LogFoundError(localrc, ESMF_ERR_PASSTHRU, ESMF_CONTEXT, &
        rcToReturn=rc)) return
    endif
    ! -------------------------------------------------------------------------
    if (meminfo) call ESMF_VMLogMemInfo ("(X+2) Use Field metadata for unique geometries")
#endif
    rc = ESMF_SUCCESS
    if (meminfo) call ESMF_VMLogMemInfo ("exiting ESMF_StateReconcile_driver")
  end subroutine ESMF_StateReconcile_driver