Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite

val s3url = container.getS3URL
val augmentedConfiguration = Map(
CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,HDD,S3",
CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,HDD,S3",
// CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY,S3",
CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,S3",
CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,S3",
CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY,S3",
// note that in S3 (and Minio) you cannot upload parts smaller than 5MB, so we trigger eviction only when there
// is enough data
CelebornConf.WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE.key -> "5MB",
Expand Down Expand Up @@ -199,6 +199,52 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite
celebornSparkSession.stop()
}

test("celeborn spark integration test - memory evict to s3 after partition split") {
assumeS3LibraryIsLoaded()

val sparkConf = new SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
val celebornSparkSession = SparkSession.builder()
.config(updateSparkConfWithStorageTypes(sparkConf, ShuffleMode.HASH, "MEMORY,S3"))
// Set split threshold equal to WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE (5MB) to trigger
// the following sequence that reproduces the production failure:
// 1. MemoryTierWriter accumulates ~5MB → eviction → DfsTierWriter (S3) created.
// getDiskFileInfo() is now non-null, enabling the regular split-threshold check.
// (Without prior eviction to disk, getDiskFileInfo() == null and no split fires.)
// 2. The S3 file grows past 5MB → SOFT_SPLIT response sent to the Spark client.
// 3. ChangePartitionManager calls allocateFromCandidates, which builds the new location
// with type=MEMORY (storageTypes.head for "MEMORY,S3") and availableTypes=MEMORY|S3.
// 4. The new MemoryTierWriter fills again → eviction triggered on the MEMORY-typed
// location → StorageManager.createDiskFile must handle type=MEMORY as a valid S3
// target (using availableStorageTypes) rather than rejecting it.
.config(s"spark.${CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key}", "5MB")
.getOrCreate()

// 20MB covers all three phases:
// ~5MB to fill the first MemoryTierWriter and trigger eviction to S3,
// ~5MB of additional S3 writes to exceed the split threshold and fire SOFT_SPLIT,
// ~10MB remaining for the second MemoryTierWriter to fill and trigger the second eviction.
val sampleSeq: immutable.Seq[(String, Int)] = buildDataSet(20 * 1024 * 1024)

repartition(celebornSparkSession, sequence = sampleSeq, partitions = 1)

// After splits there are more than 2 committed locations (one per epoch), so we assert
// type and path for each rather than an exact count.
assert(seenPartitionLocationsOpenReader.size >= 2)
seenPartitionLocationsOpenReader.asScala.foreach(location => {
assert(
location.getStorageInfo.getType == Type.MEMORY || location.getStorageInfo.getType == Type.S3)
assert(location.getStorageInfo.getFilePath == "")
})
assert(seenPartitionLocationsUpdateFileGroups.size >= 2)
seenPartitionLocationsUpdateFileGroups.asScala.foreach { location =>
if (location.getStorageInfo.getType == Type.MEMORY)
assert(location.getStorageInfo.getFilePath == "")
else if (location.getStorageInfo.getType == Type.S3)
assert(location.getStorageInfo.getFilePath.startsWith("s3://"))
}
celebornSparkSession.stop()
}

test("celeborn spark integration test - push fails no way of evicting") {
assumeS3LibraryIsLoaded()

Expand All @@ -216,16 +262,15 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite
celebornSparkSession.stop()
}

private def buildBigDataSet = {
private def buildBigDataSet: immutable.Seq[(String, Int)] = buildDataSet(10 * 1024 * 1024)

private def buildDataSet(sizeBytes: Int): immutable.Seq[(String, Int)] = {
val big1KBString: String = StringUtils.repeat(' ', 1024)
val partitionSize = 10 * 1024 * 1024
val numValues = partitionSize / big1KBString.length
// we need to write enough to trigger eviction from MEMORY to S3
val sampleSeq: immutable.Seq[(String, Int)] = (1 to numValues)
val numValues = sizeBytes / big1KBString.length
(1 to numValues)
.map(i => big1KBString + i) // all different keys
.toList
.map(v => (v.toUpperCase, Random.nextInt(12) + 1))
sampleSeq
}

def interceptLocationsSeenByClient(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1124,9 +1124,15 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
fileName: String,
userIdentifier: UserIdentifier,
partitionType: PartitionType,
partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
partitionSplitEnabled: Boolean,
overrideStorageType: StorageInfo.Type = null): (Flusher, DiskFileInfo, File) = {
val suggestedMountPoint = location.getStorageInfo.getMountPoint
val storageType = location.getStorageInfo.getType

val storageType =
if (overrideStorageType != null)
overrideStorageType
else location.getStorageInfo.getType

var retryCount = 0
var exception: IOException = null
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
true)
}
}
logError(s"Create evict file failed for ${partitionDataWriterContext.getPartitionLocation}")
logError(s"Create evict file failed for ${partitionDataWriterContext.getPartitionLocation} - no policy for ${celebornFile.storageType.name()}")
null
}

Expand Down Expand Up @@ -94,6 +94,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
}

def tryCreateFileByType(storageInfoType: StorageInfo.Type): TierWriterBase = {
val overrideType = if (evict) storageInfoType else location.getStorageInfo.getType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a scenario where eviction does not occur, should overrideType also be storageInfoType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way in order to not change the behavior of the code on the evict=false

this is code is really tricky, because it handles two things at once:

  • the wanted "storageInfoType" from the policy
  • the available types for the location (see the 'switch' on location.getStorageInfo....))

in theory the value for location.getStorageInfo.getType should be the same as storageInfoType when evict=false, but not for MEMORY

code is here:

val tryCreateFileTypeIndex =
      if (evict) {
        0
      } else {
        // keep the old behavior, always try to use memory if worker
        // has configured to use memory storage, because slots allocator
        // will not allocate slots on memory storage
        if (order.exists(_.contains(StorageInfo.Type.MEMORY.name()))) {
          order.get.indexOf(StorageInfo.Type.MEMORY.name())
        } else {
          order.get.indexOf(
            partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
        }
      }

I am not still super familiar with this code (even if I debugged it many many times 🙄 ) and I did not want to break it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose location.getStorageInfo.getType is set to HDD and createFileOrder is [HDD, S3], if the local HDD fails to create a file, storageInfoType will switch to S3. See following code segment:

for (i <- tryCreateFileTypeIndex until maxSize) {
  val storageStr = order.get(i)
  val storageInfoType = StorageInfo.fromStrToType(storageStr)
  val file = tryCreateFileByType(storageInfoType)
  if (file != null) {
    return file
  }
}

However, when storageInfoType is set to S3, the method createDiskFile still uses location.getStorageInfo.getType as HDD, which seems incorrect. Therefore, StorageType seems should always be overrideType.

try {
storageInfoType match {
case StorageInfo.Type.MEMORY =>
Expand All @@ -118,19 +119,24 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
partitionDataWriterContext,
storageManager)
} else {
logWarning(
s"Not creating ${storageInfoType} file from ${location.getStorageInfo.getType} for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
null
}
case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
if (storageManager.localOrDfsStorageAvailable) {
logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
logDebug(
s"create non-memory file type $storageInfoType (evict=$evict, override=$overrideType) for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile(
location,
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
location.getFileName,
partitionDataWriterContext.getUserIdentifier,
partitionDataWriterContext.getPartitionType,
partitionDataWriterContext.isPartitionSplitEnabled)
partitionDataWriterContext.isPartitionSplitEnabled,
overrideType // this is different from location type, in case of eviction
)
partitionDataWriterContext.setWorkingDir(workingDir)
val metaHandler = getPartitionMetaHandler(diskFileInfo)
if (flusher.isInstanceOf[LocalFlusher]
Expand Down Expand Up @@ -167,7 +173,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
}
} catch {
case e: Exception =>
logError(s"create celeborn file for storage $storageInfoType failed", e)
logError(
s"create celeborn file for storage $storageInfoType failed for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}",
e)
null
}
}
Expand All @@ -193,7 +201,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
}

val maxSize = order.get.length
for (i <- tryCreateFileTypeIndex until maxSize) {
val firstIndex = tryCreateFileTypeIndex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? The change is same as the current implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the value of "firstIndex" is logged below

for (i <- firstIndex until maxSize) {
val storageStr = order.get(i)
val storageInfoType = StorageInfo.fromStrToType(storageStr)
val file = tryCreateFileByType(storageInfoType)
Expand All @@ -203,9 +212,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
}

logError(
s"Could not create file for storage type ${location.getStorageInfo.getType}")
s"Could not create file for storage type ${location.getStorageInfo.getType}, tried ${order.get} firstIndex $firstIndex for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")

throw new CelebornIOException(
s"Create file failed for context ${partitionDataWriterContext.toString}")
s"Create file failed for context ${partitionDataWriterContext.toString}, tried ${order.get} firstIndex $firstIndex")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class StoragePolicyCase1 extends CelebornFunSuite {
any(),
any(),
any(),
any(),
any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))

val memoryHintPartitionLocation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
any(),
any(),
any(),
any(),
any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))

val memoryHintPartitionLocation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
any(),
any(),
any(),
any(),
any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))

val memoryHintPartitionLocation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
any(),
any(),
any(),
any(),
any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))

val memoryHintPartitionLocation =
Expand Down
Loading